Skip to content

SDK Reference

Complete reference for the DeltaBase TypeScript SDK

Complete reference for @delta-base/server - the TypeScript SDK for Node.js applications.

Terminal window
npm install @delta-base/server

Main entry point for the SDK.

import { DeltaBase } from '@delta-base/server';
const deltabase = new DeltaBase({
apiKey: 'your-api-key',
baseUrl?: 'https://api.delta-base.com' // Optional
});

Methods:

Get an EventStore client for stream operations.

const eventStore = deltabase.getEventStore('my-event-store');

Get an EventBus client for subscription management.

const eventBus = deltabase.getEventBus('my-event-store');

Get a ManagementClient for administrative operations.

const management = deltabase.getManagement();

Core client for event stream operations.

appendToStream<T>(streamId, events, options?)

Section titled “appendToStream<T>(streamId, events, options?)”

Append events to a stream with optional optimistic concurrency control.

interface AppendOptions {
expectedStreamVersion?: number;
idempotencyKey?: string;
}
const result = await eventStore.appendToStream('user-123', [
{
type: 'user.created',
data: { id: 'user-123', email: 'john@example.com' },
metadata?: { userId: 'admin' }
}
], {
expectedStreamVersion: 5 // Optional: fail if stream has moved past version 5
});
// Result type
interface AppendResult {
streamId: string;
eventsAppended: number;
currentStreamVersion: number;
globalPosition: number;
}

Read events from a stream.

interface ReadStreamOptions {
direction?: 'forward' | 'backward';
fromStreamPosition?: number;
toStreamPosition?: number;
limit?: number; // Max 1000
}
const events = await eventStore.readStream<UserEvent>('user-123', {
direction: 'forward',
limit: 100
});
// Returns array of ReadEvent<T>
interface ReadEvent<T> {
streamId: string;
streamPosition: number;
globalPosition: number;
eventId: string;
type: string;
data: T;
metadata?: any;
schemaVersion: string;
transactionId: string;
createdAt: string;
}

aggregateStream<State_Event>(streamId, options)

Section titled “aggregateStream<State_Event>(streamId, options)”

Build state by applying events to an aggregate.

interface AggregateOptions<State, Event> {
initialState: () => State;
evolve: (state: State, event: ReadEvent<Event>) => State;
fromStreamPosition?: number;
toStreamPosition?: number;
}
const result = await eventStore.aggregateStream<User, UserEvent>('user-123', {
initialState: () => ({ id: '', email: '', name: '', active: false }),
evolve: (user, event) => {
switch (event.type) {
case 'user.created':
return { ...user, ...event.data, active: true };
case 'user.deactivated':
return { ...user, active: false };
default:
return user;
}
}
});
// Result type
interface AggregateResult<State> {
state: State;
version: number;
lastEventPosition: number;
}

Query events across all streams with filtering.

interface QueryEventsOptions {
types?: string[]; // Filter by event types
streams?: string[]; // Filter by stream IDs
after?: string; // ISO timestamp
before?: string; // ISO timestamp
fromGlobalPosition?: number;
limit?: number;
offset?: number;
direction?: 'forward' | 'backward';
}
const events = await eventStore.queryEvents<UserEvent>({
types: ['user.created', 'user.updated'],
after: '2024-01-01T00:00:00Z',
limit: 50
});
// Returns QueryResult<ReadEvent<T>>
interface QueryResult<T> {
data: T[];
total?: number;
hasMore: boolean;
nextOffset?: number;
}

Query stream information.

interface QueryStreamsOptions {
streamType?: string; // Filter by stream type prefix
hasEvents?: boolean; // Only streams with events
limit?: number;
offset?: number;
}
const streams = await eventStore.queryStreams({
streamType: 'user',
limit: 100
});
// Returns QueryResult<StreamInfo>
interface StreamInfo {
streamId: string;
streamType: string;
eventCount: number;
firstEventPosition: number;
lastEventPosition: number;
createdAt: string;
updatedAt: string;
}

Get a simple list of stream IDs.

const streamIds = await eventStore.listStreams({
limit: 1000
});
// Returns string[]

Client for managing event subscriptions.

Create a comprehensive subscription.

interface SubscribeOptions {
eventFilter: string; // Pattern like 'user.*', '*.created', etc.
subscriberType: SubscriberType;
subscriberConfig: SubscriberConfig;
retryPolicy?: RetryPolicy;
position?: 'latest' | 'earliest';
}
enum SubscriberType {
Webhook = 'webhook',
WebSocket = 'websocket'
}
interface WebhookConfig {
url: string;
headers?: Record<string, string>;
timeout?: number;
}
interface RetryPolicy {
maxRetries?: number; // Default: 3
initialBackoffMs?: number; // Default: 1000
backoffMultiplier?: number; // Default: 2
}
const subscription = await eventBus.subscribe({
eventFilter: 'user.*',
subscriberType: SubscriberType.Webhook,
subscriberConfig: {
url: 'https://myapp.com/webhooks/deltabase',
headers: { 'Authorization': 'Bearer secret' }
},
retryPolicy: {
maxRetries: 5,
backoffMultiplier: 2
}
});

subscribeWebhook(eventFilter, url, options?)

Section titled “subscribeWebhook(eventFilter, url, options?)”

Convenient method for webhook subscriptions.

interface WebhookSubscribeOptions {
headers?: Record<string, string>;
retryPolicy?: RetryPolicy;
position?: 'latest' | 'earliest';
}
const subscription = await eventBus.subscribeWebhook(
'user.*',
'https://myapp.com/webhooks/deltabase',
{
headers: { 'X-API-Key': 'secret' },
retryPolicy: { maxRetries: 5 }
}
);

Get subscription details.

const subscription = await eventBus.getSubscription('sub_abc123');
interface Subscription {
id: string;
eventFilter: string;
subscriberType: string;
subscriberConfig: any;
status: 'active' | 'paused' | 'failed';
retryPolicy?: RetryPolicy;
position: string;
createdAt: string;
lastDeliveryAttempt?: string;
failedDeliveries: number;
totalDeliveries: number;
}

List all subscriptions.

interface ListSubscriptionsOptions {
status?: 'active' | 'paused' | 'failed';
limit?: number;
offset?: number;
}
const result = await eventBus.listSubscriptions({
status: 'active',
limit: 50
});
// Returns QueryResult<Subscription>

Delete a subscription.

await eventBus.unsubscribe('sub_abc123');

Administrative operations for event stores.

Create a new event store.

interface CreateEventStoreOptions {
name: string;
description?: string;
region?: string;
settings?: EventStoreSettings;
}
interface EventStoreSettings {
retentionPeriodDays?: number; // Default: 365
maxStreamSizeBytes?: number; // Default: 100MB
}
const eventStore = await management.createEventStore({
name: 'my-new-store',
description: 'Event store for my application',
settings: {
retentionPeriodDays: 730, // 2 years
maxStreamSizeBytes: 209715200 // 200MB
}
});

List accessible event stores.

interface ListEventStoresOptions {
filter?: string; // Filter by name
limit?: number;
offset?: number;
}
const stores = await management.listEventStores({
filter: 'my-app',
limit: 20
});
// Returns QueryResult<EventStore>

Get detailed event store information.

const store = await management.getEventStore('es_abc123');
interface EventStore {
id: string;
name: string;
description?: string;
region?: string;
status: 'active' | 'suspended' | 'deleted';
createdAt: string;
updatedAt: string;
statistics?: {
totalStreams: number;
totalEvents: number;
databaseSizeBytes: number;
oldestEventTimestamp?: string;
newestEventTimestamp?: string;
};
settings?: EventStoreSettings;
}

Update event store settings.

await management.updateEventStore('es_abc123', {
retentionPeriodDays: 1095 // 3 years
});

Delete an event store (irreversible).

await management.deleteEventStore('es_abc123');

All methods can throw DeltaBaseError with structured information:

import { DeltaBaseError } from '@delta-base/server';
try {
await eventStore.appendToStream('stream-1', events, {
expectedStreamVersion: 5
});
} catch (error) {
if (error instanceof DeltaBaseError) {
console.log('Status:', error.status); // HTTP status code
console.log('Code:', error.code); // Error code
console.log('Message:', error.message); // Human readable message
console.log('Details:', error.details); // Additional context
if (error.status === 409) {
// Handle version conflict
console.log('Stream was modified by someone else');
}
}
}

Common Error Codes:

  • UNAUTHORIZED (401): Invalid API key
  • FORBIDDEN (403): Insufficient permissions
  • NOT_FOUND (404): Resource doesn’t exist
  • VERSION_CONFLICT (409): Stream version mismatch
  • RATE_LIMITED (429): Rate limit exceeded
  • INTERNAL_ERROR (500): Server error

// Input event for appending
interface Event<Type = string, Data = any, Metadata = any> {
type: Type;
data: Data;
metadata?: Metadata;
}
// Output event from reading
interface ReadEvent<Data = any> {
streamId: string;
streamPosition: number;
globalPosition: number;
eventId: string;
type: string;
data: Data;
metadata?: any;
schemaVersion: string;
transactionId: string;
createdAt: string;
}
// Strongly typed events
interface UserCreated {
type: 'user.created';
data: {
id: string;
email: string;
name: string;
};
}
interface UserUpdated {
type: 'user.updated';
data: {
id: string;
email?: string;
name?: string;
};
}
type UserEvent = UserCreated | UserUpdated;
// Usage
await eventStore.appendToStream<UserEvent>('user-123', [{
type: 'user.created',
data: { id: 'user-123', email: 'john@example.com', name: 'John' }
}]);
const events = await eventStore.readStream<UserEvent>('user-123');

This SDK provides a complete, type-safe interface for building event-sourced applications with DeltaBase.