Event-Sourced Aggregate
With DCB there are more flexible ways to enforce consistency (see article about Aggregates).
Sometimes, however, an Event-Sourced Aggregate can still be useful. For example to slowly migrate an existing Event-Sourced application or if the flexibility of DCB is not required (see conclusion below)
Challenge
The goal is to implement an Event-Sourced Aggregate that works with a DCB compliant Event Store
Traditional approaches
The Implementation of an Event-Sourced Aggregate depends on the Programming Language and Framework, but the common functionality is:
- Relevant Events are loaded, remembering the position of the last consumed Event
- A decision is made based on the projected state of those Events
- If successful, a new Event is appended specifying the remembered position
- The Event Store appends the new Event only if no other Event was stored in the meantime and fails otherwise
- Upon failure the process can be repeated until the Event was successfully persisted
The following is a potential JavaScript version of an Aggregate representing a course students can be subscribed to:
/**
* @typedef {{type: string}} Event
* @typedef {{position: number, type: string}} SequencedEvent
*/
class CourseAggregate {
/**
* @type {Event[]} Every state change is stored as a corresponding Event
*/
#recordedEvents = []
/**
* @type {number} The version of this Aggregate at the time of reconstitution
*/
#version = 0
/** @type {string} Identifier of the course */
#id
#capacity = 0
#numberOfSubscriptions = 0
/**
* Static constructor that returns a new CourseAggregate instance
*
* @param {string} id
* @param {string} title
* @param {number} capacity
* @returns {CourseAggregate}
*/
static create(id, title, capacity) {
const instance = new CourseAggregate()
instance.#recordEvent({
type: "CourseDefined",
data: { courseId: id, title, capacity },
})
return instance
}
/**
* Static constructor that returns an existing CourseAggregate instance
* from previously persisted events
*
* @param {SequencedEvent[]} sequencedEvents
* @returns {CourseAggregate}
*/
static reconstitute(sequencedEvents) {
const instance = new CourseAggregate()
for (const event of sequencedEvents) {
instance.#apply(event)
instance.#version = event.position
}
return instance
}
/**
* @returns {string} The identifier of this Aggregate
*/
get id() {
return this.#id
}
/**
* @returns {number} The version of this Aggregate (for optimistic locking)
*/
get version() {
return this.#version
}
/**
* API to change the course's capacity
* @param {number} newCapacity
* @returns {void}
*/
changeCapacity(newCapacity) {
if (newCapacity === this.#capacity) {
throw new Error(
`Course "${this.#id}" already has a capacity of "${newCapacity}`
)
}
if (newCapacity < this.#numberOfSubscriptions) {
throw new Error(
`Course "${this.#id}" already has ${
this.#numberOfSubscriptions
} active subscriptions, can\'t set the capacity below that`
)
}
this.#recordEvent({
type: "CourseCapacityChanged",
data: { courseId: this.#id, newCapacity },
})
}
/**
* API to subscribe a student to this course
* @param {string} studentId Identifier of the student to subscribe
* @returns {void}
*/
subscribeStudent(studentId) {
if (this.#numberOfSubscriptions === this.#capacity) {
throw new Error(`Course "${this.#id}" is already fully booked`)
}
this.#recordEvent({
type: "StudentSubscribedToCourse",
data: { courseId: this.#id, studentId },
})
}
/**
* Internal method to store an event and apply it to the in-memory state
* @param {Event} event
*/
#recordEvent(event) {
this.#recordedEvents.push(event)
this.#apply(event)
}
/**
* Internal method to apply an event to the in-memory state of this Aggregate
* @param {Event} event
*/
#apply(event) {
switch (event.type) {
case "CourseDefined":
this.#id = event.data.courseId
this.#capacity = event.data.capacity
break
case "CourseCapacityChanged":
this.#capacity = event.data.newCapacity
break
case "StudentSubscribedToCourse":
this.#numberOfSubscriptions++
break
}
}
/**
* Public method to retrieve and flush all recorded events
* @returns {Event[]}
*/
pullRecordedEvents() {
const recordedEvents = this.#recordedEvents
this.#recordedEvents = []
return recordedEvents
}
}
With that, a CourseAggregate
can be created:
const course = CourseAggregate.create('c1', 'Course 01', 10)
course.changeCapacity(15)
const events = course.pullRecordedEvents()
console.log(events.map(e => e.type)) // ["CourseDefined","CourseCapacityChanged"]
...or reconstituted from previously recorded events:
const events = [
{
position: 1,
type: "CourseDefined",
data: { courseId: "c1", title: "Course 01", capacity: 10 },
tags: ["course:c1"],
},
{
position: 2,
type: "CourseCapacityChanged",
data: { courseId: "c1", newCapacity: 15 },
tags: ["course:c1"],
},
]
const course = CourseAggregate.reconstitute(events)
course.changeCapacity(15) // Error: Course "c1" already has a capacity of "15
Repository
Aggregates are often paired with a corresponding Repository that handles saving and retrieving instances.
The following shows a simple implementation that works with a traditional Event Store:
class CourseRepository {
#eventStore
constructor(eventStore) {
this.#eventStore = eventStore
}
load(courseId) {
const streamName = `course-${courseId}`
const eventsForThisCourse = this.#eventStore.readStream(streamName)
return CourseAggregate.reconstitute(eventsForThisCourse)
}
save(course) {
const streamName = `course-${course.id}`
// fails if there are new events in the stream
// with a position > course.version
this.#eventStore.appendToStream(
streamName,
course.pullRecordedEvents(),
{
streamState: course.version
}
)
}
}
It can be used with an InMemoryEventStore.js
// create and save a new instance:
const repository = new CourseRepository(eventStore)
const course = CourseAggregate.create('c1', 'Course 01', 10)
repository.save(course)
// update an existing instance:
const course2 = repository.load('c1')
course2.changeCapacity(15)
repository.save(course2)
console.log(
eventStore.readStream('course-c1')
.map(e => e.type)
) // ["CourseDefined","CourseCapacityChanged"]
DCB approach
With DCB we can reuse the CourseAggregate
from above and only need to adjust the repository implementation:
class DcbCourseRepository {
#eventStore
constructor(eventStore) {
this.#eventStore = eventStore
}
load(courseId) {
const tags = [`course:${courseId}`]
const query = createQuery([{ tags }])
const eventsForThisCourse = this.#eventStore.read(query)
return CourseAggregate.reconstitute(eventsForThisCourse)
}
save(course) {
const tags = [`course:${course.id}`]
const query = createQuery([{ tags }])
const eventsWithTags = course
.pullRecordedEvents()
.map((event) => ({ ...event, tags }))
this.#eventStore.append(eventsWithTags, {
failIfEventsMatch: query,
after: course.version,
})
}
}
It can be used with an InMemoryDcbEventStore.js
// create and save a new instance:
const dcbEventStore = new InMemoryDcbEventStore()
const repository = new DcbCourseRepository(dcbEventStore)
const course = CourseAggregate.create("c1", "Course 01", 10)
repository.save(course)
// update an existing instance:
const course2 = repository.load("c1")
course2.changeCapacity(15)
repository.save(course2)
console.log(
dcbEventStore.read(createQuery([{ tags: ["course:c1"] }])).first()
) // {type: 'CourseDefined', data: { courseId: 'c1', title: 'Course 01', capacity: 10 }, tags: [ 'course:c1' ], position: 1}
Conclusion
This example demonstrates how the Aggregate pattern can be used with DCB