Skip to content

Event Sourcing Patterns

Understanding the key patterns that make event sourcing work in practice

Event sourcing isn’t just about storing events. It’s about adopting patterns that make your applications more reliable, auditable, and flexible. Let’s explore the patterns that actually matter.

In traditional systems, you have entities. In event sourcing, you have aggregates - consistency boundaries that enforce business rules.

Single responsibility: Each aggregate manages one piece of business logic.

// Good: User aggregate manages user-related events
class User {
constructor(
public id: string,
public email: string,
public status: 'active' | 'suspended'
) {}
suspend(reason: string): UserSuspended {
if (this.status === 'suspended') {
throw new Error('User already suspended');
}
return {
type: 'user.suspended',
data: { id: this.id, reason }
};
}
}
// Bad: User aggregate trying to manage orders too
class UserWithOrders {
suspendUserAndCancelOrders() {
// This crosses aggregate boundaries - don't do this
}
}

Small and focused: Aggregates should be small enough to fit in memory.

// Good: Order aggregate contains just order data
class Order {
constructor(
public id: string,
public customerId: string,
public items: OrderItem[],
public status: OrderStatus
) {}
}
// Bad: Customer aggregate with all their orders
class CustomerWithAllOrders {
constructor(
public id: string,
public orders: Order[] // Could be thousands of orders
) {}
}
function buildUserFromEvents(events: UserEvent[]): User {
let user: User | null = null;
for (const event of events) {
switch (event.type) {
case 'user.registered':
user = new User(
event.data.id,
event.data.email,
'active'
);
break;
case 'user.suspended':
if (user) user.status = 'suspended';
break;
case 'user.reactivated':
if (user) user.status = 'active';
break;
}
}
return user;
}

Command Query Responsibility Segregation (CQRS)

Section titled “Command Query Responsibility Segregation (CQRS)”

CQRS separates writes (commands) from reads (queries). This isn’t optional in event sourcing - it’s how the pattern works.

Commands produce events that get stored in streams:

// Command: Register a new user
async function registerUser(command: RegisterUserCommand): Promise<void> {
// Validate command
if (!isValidEmail(command.email)) {
throw new Error('Invalid email');
}
// Create event
const event: UserRegistered = {
type: 'user.registered',
data: {
id: command.userId,
email: command.email,
registeredAt: new Date().toISOString()
}
};
// Store event
await eventStore.appendToStream(command.userId, [event]);
}

Queries read from optimized read models built from events:

// Query: Get user profile
async function getUserProfile(userId: string): Promise<UserProfile> {
// Read from optimized view, not from events
return await userProfileView.get(userId);
}
// The view is built by processing events
async function updateUserProfileView(event: UserEvent): Promise<void> {
switch (event.type) {
case 'user.registered':
await userProfileView.create({
id: event.data.id,
email: event.data.email,
status: 'active',
createdAt: event.data.registeredAt
});
break;
case 'user.suspended':
await userProfileView.update(event.data.id, {
status: 'suspended'
});
break;
}
}

When you need to coordinate multiple aggregates, use sagas. They’re like workflows that react to events.

Process Manager: Manages a specific business process with a defined lifecycle.

class OrderFulfillmentSaga {
constructor(public orderId: string, public state: 'pending' | 'shipped' | 'delivered') {}
handle(event: DomainEvent): DomainEvent[] {
switch (event.type) {
case 'order.placed':
return [
{ type: 'inventory.reserve', data: { orderId: event.data.id, items: event.data.items } },
{ type: 'payment.authorize', data: { orderId: event.data.id, amount: event.data.total } }
];
case 'inventory.reserved':
if (this.state === 'pending') {
return [{ type: 'warehouse.ship', data: { orderId: this.orderId } }];
}
break;
case 'payment.failed':
return [
{ type: 'inventory.release', data: { orderId: this.orderId } },
{ type: 'order.cancel', data: { orderId: this.orderId, reason: 'Payment failed' } }
];
}
return [];
}
}

Saga: Reacts to events and issues commands to maintain invariants across aggregates.

// Saga: Ensure user accounts stay consistent
class UserAccountSaga {
async handle(event: DomainEvent): Promise<void> {
switch (event.type) {
case 'user.suspended':
// Automatically disable all user sessions
await this.commandBus.send({
type: 'session.disable-all',
userId: event.data.id
});
// Cancel any pending orders
await this.commandBus.send({
type: 'order.cancel-pending',
userId: event.data.id,
reason: 'User account suspended'
});
break;
}
}
}

Transform events into queryable views.

Build a single view from events:

class UserProfileProjection {
async project(event: UserEvent): Promise<void> {
switch (event.type) {
case 'user.registered':
await this.db.users.create({
id: event.data.id,
email: event.data.email,
displayName: event.data.email.split('@')[0],
status: 'active',
createdAt: event.createdAt
});
break;
case 'user.profile-updated':
await this.db.users.update(event.data.id, {
displayName: event.data.displayName,
avatar: event.data.avatar
});
break;
}
}
}

Combine events from multiple streams:

class OrderSummaryProjection {
async project(event: DomainEvent): Promise<void> {
switch (event.type) {
case 'order.placed':
await this.db.orderSummaries.create({
orderId: event.data.id,
customerId: event.data.customerId,
total: event.data.total,
status: 'pending'
});
break;
case 'payment.completed':
await this.db.orderSummaries.update(event.data.orderId, {
status: 'paid',
paidAt: event.createdAt
});
break;
case 'shipment.delivered':
await this.db.orderSummaries.update(event.data.orderId, {
status: 'delivered',
deliveredAt: event.createdAt
});
break;
}
}
}

For aggregates with many events, snapshots improve performance.

  • Aggregates with > 100 events
  • Frequently accessed aggregates
  • Complex state building logic
interface UserSnapshot {
id: string;
email: string;
status: string;
version: number; // Event stream version when snapshot was taken
createdAt: string;
}
async function loadUserWithSnapshot(userId: string): Promise<User> {
// Try to load snapshot first
const snapshot = await snapshotStore.get(`user-${userId}`);
let user: User;
let fromVersion = 0;
if (snapshot) {
user = new User(snapshot.id, snapshot.email, snapshot.status);
fromVersion = snapshot.version + 1;
} else {
user = new User('', '', 'inactive');
}
// Load events since snapshot
const events = await eventStore.readStream(userId, {
fromStreamPosition: fromVersion
});
// Apply events to rebuild current state
for (const event of events) {
user = user.apply(event);
}
// Save new snapshot every 50 events
if (events.length > 50) {
await snapshotStore.save(`user-${userId}`, {
id: user.id,
email: user.email,
status: user.status,
version: fromVersion + events.length - 1,
createdAt: new Date().toISOString()
});
}
return user;
}

Event sourcing gives you time travel capabilities. Use them.

// What did this user look like 6 months ago?
async function getUserAtTime(userId: string, timestamp: string): Promise<User> {
const events = await eventStore.readStream(userId, {
before: timestamp
});
return buildUserFromEvents(events);
}
// What was the order total before the discount was applied?
async function getOrderBeforeEvent(orderId: string, beforeEventType: string): Promise<Order> {
const events = await eventStore.readStream(orderId);
const cutoffIndex = events.findIndex(e => e.type === beforeEventType);
const eventsBeforeCutoff = events.slice(0, cutoffIndex);
return buildOrderFromEvents(eventsBeforeCutoff);
}
// Show all changes to an order
async function getOrderAuditTrail(orderId: string): Promise<AuditEntry[]> {
const events = await eventStore.readStream(orderId);
return events.map(event => ({
timestamp: event.createdAt,
action: event.type,
details: event.data,
user: event.metadata?.userId
}));
}
// Find patterns in user behavior
async function analyzeUserJourney(userId: string): Promise<JourneyAnalysis> {
const events = await eventStore.queryEvents({
streams: [userId],
types: ['user.*', 'session.*', 'order.*']
});
// Analyze patterns, conversion funnels, etc.
return analyzeEvents(events);
}

These patterns work together to give you:

  1. Auditability: Every change is recorded
  2. Debuggability: Replay any scenario
  3. Flexibility: Add new views without changing core logic
  4. Scalability: CQRS allows independent scaling of reads and writes
  5. Time travel: Query any point in time
  6. Integration: Events are perfect for microservices communication

Event sourcing isn’t more complex than traditional CRUD. It’s just different. And once you get used to thinking in events, you’ll wonder how you ever built systems without them.