Event Sourcing Patterns
Understanding the key patterns that make event sourcing work in practice
Event sourcing isn’t just about storing events. It’s about adopting patterns that make your applications more reliable, auditable, and flexible. Let’s explore the patterns that actually matter.
The Aggregate Pattern
Section titled “The Aggregate Pattern”In traditional systems, you have entities. In event sourcing, you have aggregates - consistency boundaries that enforce business rules.
What makes a good aggregate?
Section titled “What makes a good aggregate?”Single responsibility: Each aggregate manages one piece of business logic.
// Good: User aggregate manages user-related eventsclass User { constructor( public id: string, public email: string, public status: 'active' | 'suspended' ) {}
suspend(reason: string): UserSuspended { if (this.status === 'suspended') { throw new Error('User already suspended'); }
return { type: 'user.suspended', data: { id: this.id, reason } }; }}
// Bad: User aggregate trying to manage orders tooclass UserWithOrders { suspendUserAndCancelOrders() { // This crosses aggregate boundaries - don't do this }}
Small and focused: Aggregates should be small enough to fit in memory.
// Good: Order aggregate contains just order dataclass Order { constructor( public id: string, public customerId: string, public items: OrderItem[], public status: OrderStatus ) {}}
// Bad: Customer aggregate with all their ordersclass CustomerWithAllOrders { constructor( public id: string, public orders: Order[] // Could be thousands of orders ) {}}
Building aggregates from events
Section titled “Building aggregates from events”function buildUserFromEvents(events: UserEvent[]): User { let user: User | null = null;
for (const event of events) { switch (event.type) { case 'user.registered': user = new User( event.data.id, event.data.email, 'active' ); break;
case 'user.suspended': if (user) user.status = 'suspended'; break;
case 'user.reactivated': if (user) user.status = 'active'; break; } }
return user;}
Command Query Responsibility Segregation (CQRS)
Section titled “Command Query Responsibility Segregation (CQRS)”CQRS separates writes (commands) from reads (queries). This isn’t optional in event sourcing - it’s how the pattern works.
Command side: Event streams
Section titled “Command side: Event streams”Commands produce events that get stored in streams:
// Command: Register a new userasync function registerUser(command: RegisterUserCommand): Promise<void> { // Validate command if (!isValidEmail(command.email)) { throw new Error('Invalid email'); }
// Create event const event: UserRegistered = { type: 'user.registered', data: { id: command.userId, email: command.email, registeredAt: new Date().toISOString() } };
// Store event await eventStore.appendToStream(command.userId, [event]);}
Query side: Read models
Section titled “Query side: Read models”Queries read from optimized read models built from events:
// Query: Get user profileasync function getUserProfile(userId: string): Promise<UserProfile> { // Read from optimized view, not from events return await userProfileView.get(userId);}
// The view is built by processing eventsasync function updateUserProfileView(event: UserEvent): Promise<void> { switch (event.type) { case 'user.registered': await userProfileView.create({ id: event.data.id, email: event.data.email, status: 'active', createdAt: event.data.registeredAt }); break;
case 'user.suspended': await userProfileView.update(event.data.id, { status: 'suspended' }); break; }}
Saga Pattern
Section titled “Saga Pattern”When you need to coordinate multiple aggregates, use sagas. They’re like workflows that react to events.
Process managers vs. Sagas
Section titled “Process managers vs. Sagas”Process Manager: Manages a specific business process with a defined lifecycle.
class OrderFulfillmentSaga { constructor(public orderId: string, public state: 'pending' | 'shipped' | 'delivered') {}
handle(event: DomainEvent): DomainEvent[] { switch (event.type) { case 'order.placed': return [ { type: 'inventory.reserve', data: { orderId: event.data.id, items: event.data.items } }, { type: 'payment.authorize', data: { orderId: event.data.id, amount: event.data.total } } ];
case 'inventory.reserved': if (this.state === 'pending') { return [{ type: 'warehouse.ship', data: { orderId: this.orderId } }]; } break;
case 'payment.failed': return [ { type: 'inventory.release', data: { orderId: this.orderId } }, { type: 'order.cancel', data: { orderId: this.orderId, reason: 'Payment failed' } } ]; }
return []; }}
Saga: Reacts to events and issues commands to maintain invariants across aggregates.
// Saga: Ensure user accounts stay consistentclass UserAccountSaga { async handle(event: DomainEvent): Promise<void> { switch (event.type) { case 'user.suspended': // Automatically disable all user sessions await this.commandBus.send({ type: 'session.disable-all', userId: event.data.id });
// Cancel any pending orders await this.commandBus.send({ type: 'order.cancel-pending', userId: event.data.id, reason: 'User account suspended' }); break; } }}
Projection Patterns
Section titled “Projection Patterns”Transform events into queryable views.
Simple projections
Section titled “Simple projections”Build a single view from events:
class UserProfileProjection { async project(event: UserEvent): Promise<void> { switch (event.type) { case 'user.registered': await this.db.users.create({ id: event.data.id, email: event.data.email, displayName: event.data.email.split('@')[0], status: 'active', createdAt: event.createdAt }); break;
case 'user.profile-updated': await this.db.users.update(event.data.id, { displayName: event.data.displayName, avatar: event.data.avatar }); break; } }}
Multi-stream projections
Section titled “Multi-stream projections”Combine events from multiple streams:
class OrderSummaryProjection { async project(event: DomainEvent): Promise<void> { switch (event.type) { case 'order.placed': await this.db.orderSummaries.create({ orderId: event.data.id, customerId: event.data.customerId, total: event.data.total, status: 'pending' }); break;
case 'payment.completed': await this.db.orderSummaries.update(event.data.orderId, { status: 'paid', paidAt: event.createdAt }); break;
case 'shipment.delivered': await this.db.orderSummaries.update(event.data.orderId, { status: 'delivered', deliveredAt: event.createdAt }); break; } }}
Snapshot Pattern
Section titled “Snapshot Pattern”For aggregates with many events, snapshots improve performance.
When to use snapshots
Section titled “When to use snapshots”- Aggregates with > 100 events
- Frequently accessed aggregates
- Complex state building logic
Implementation
Section titled “Implementation”interface UserSnapshot { id: string; email: string; status: string; version: number; // Event stream version when snapshot was taken createdAt: string;}
async function loadUserWithSnapshot(userId: string): Promise<User> { // Try to load snapshot first const snapshot = await snapshotStore.get(`user-${userId}`);
let user: User; let fromVersion = 0;
if (snapshot) { user = new User(snapshot.id, snapshot.email, snapshot.status); fromVersion = snapshot.version + 1; } else { user = new User('', '', 'inactive'); }
// Load events since snapshot const events = await eventStore.readStream(userId, { fromStreamPosition: fromVersion });
// Apply events to rebuild current state for (const event of events) { user = user.apply(event); }
// Save new snapshot every 50 events if (events.length > 50) { await snapshotStore.save(`user-${userId}`, { id: user.id, email: user.email, status: user.status, version: fromVersion + events.length - 1, createdAt: new Date().toISOString() }); }
return user;}
Temporal Patterns
Section titled “Temporal Patterns”Event sourcing gives you time travel capabilities. Use them.
Point-in-time queries
Section titled “Point-in-time queries”// What did this user look like 6 months ago?async function getUserAtTime(userId: string, timestamp: string): Promise<User> { const events = await eventStore.readStream(userId, { before: timestamp });
return buildUserFromEvents(events);}
// What was the order total before the discount was applied?async function getOrderBeforeEvent(orderId: string, beforeEventType: string): Promise<Order> { const events = await eventStore.readStream(orderId); const cutoffIndex = events.findIndex(e => e.type === beforeEventType); const eventsBeforeCutoff = events.slice(0, cutoffIndex);
return buildOrderFromEvents(eventsBeforeCutoff);}
Debugging and analysis
Section titled “Debugging and analysis”// Show all changes to an orderasync function getOrderAuditTrail(orderId: string): Promise<AuditEntry[]> { const events = await eventStore.readStream(orderId);
return events.map(event => ({ timestamp: event.createdAt, action: event.type, details: event.data, user: event.metadata?.userId }));}
// Find patterns in user behaviorasync function analyzeUserJourney(userId: string): Promise<JourneyAnalysis> { const events = await eventStore.queryEvents({ streams: [userId], types: ['user.*', 'session.*', 'order.*'] });
// Analyze patterns, conversion funnels, etc. return analyzeEvents(events);}
The beauty of event sourcing
Section titled “The beauty of event sourcing”These patterns work together to give you:
- Auditability: Every change is recorded
- Debuggability: Replay any scenario
- Flexibility: Add new views without changing core logic
- Scalability: CQRS allows independent scaling of reads and writes
- Time travel: Query any point in time
- Integration: Events are perfect for microservices communication
Event sourcing isn’t more complex than traditional CRUD. It’s just different. And once you get used to thinking in events, you’ll wonder how you ever built systems without them.