Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-channel / dbos / DBOSChannel

Class: DBOSChannel<T>

Defined in: dbos/DBOSChannel.d.ts:83

DBOS-backed distributed channel implementation.

REVISED: Based on actual DBOS SDK patterns from chaingraph-executor.

Features:

  • ✅ Distributed across multiple servers via PostgreSQL
  • ✅ Persistent (survives crashes and restarts)
  • ✅ Real-time via PostgreSQL LISTEN/NOTIFY
  • ✅ Two read modes: 'history' (with optional lastN) and 'realtime'
  • ✅ Automatic deduplication between history and real-time
  • ✅ Cross-execution communication
  • ✅ Yields batches (T[]) for performance (1000x faster than item-by-item)

Architecture:

Server 1 → DBOSChannel.send() → DBOS.writeStream() → PostgreSQL

                                  LISTEN/NOTIFY

Server 2 ← DBOSChannel[Symbol.asyncIterator] ← PostgreSQL SELECT

Best For:

  • Multi-server distributed execution
  • Production deployments requiring HA
  • Cross-execution communication
  • Persistent streaming pipelines

Requirements:

  • Node.js environment (not browser-compatible)
  • DBOS SDK initialized
  • PostgreSQL database connection

Examples

typescript
import { DBOSChannel } from '@badaitech/chaingraph-channel/dbos'

// Server 1: Producer
const channel = new DBOSChannel<Message>('workflow-123', 'messages', {
  pgConnectionString: process.env.DATABASE_URL!,
  queryPool: pool
})
await channel.send({ type: 'data', value: 42 })

// Server 2: Consumer (different server!)
const channel = new DBOSChannel<Message>('workflow-123', 'messages', {
  pgConnectionString: process.env.DATABASE_URL!,
  queryPool: pool
})
for await (const batch of channel) {
  for (const msg of batch) {
    console.log('Received from Server 1:', msg)
  }
}
typescript
const channel = new DBOSChannel('exec-123', 'events', {
  pgConnectionString: process.env.DATABASE_URL!,
  queryPool: pool,
  readMode: 'history',
  lastN: 50,           // ← Read last 50 only
  deduplication: true, // ← Prevent overlap
})

for await (const batch of channel) {
  // First batches: Last 50 from history
  // Then: Real-time updates as they arrive
  console.log(batch)
}

Type Parameters

T

T

The type of values transmitted through the channel

Implements

Constructors

Constructor

new DBOSChannel<T>(workflowId, streamKey, config): DBOSChannel<T>

Defined in: dbos/DBOSChannel.d.ts:100

Parameters

workflowId

string

streamKey

string

config

Partial<Omit<DBOSChannelConfig, "workflowId" | "streamKey">> & object

Returns

DBOSChannel<T>

Methods

[asyncIterator]()

[asyncIterator](): AsyncIterableIterator<T[]>

Defined in: dbos/DBOSChannel.d.ts:133

Async iteration over DBOS stream (YIELDS BATCHES).

SIMPLIFIED: Only 2 read modes:

  • 'history': Read history (all or last N), then real-time
  • 'realtime': Skip history, start from now

Returns

AsyncIterableIterator<T[]>

Implementation of

IChannel.[asyncIterator]


clone()

clone(): IChannel<T>

Defined in: dbos/DBOSChannel.d.ts:162

Creates a deep copy of the channel with independent state.

The clone will:

  • Have the same configuration
  • Contain copied buffer values (not shared references)
  • Be completely independent from the original

Used for:

  • Node cloning in flow editor
  • Creating execution instances from flow templates

Note: Subscribers and error state are not cloned

Returns

IChannel<T>

New independent channel instance

Example

typescript
const original = new InMemoryChannel<string>()
await original.send('data')

const clone = original.clone()
// Clone has independent state
// Modifying clone doesn't affect original

Implementation of

IChannel.clone


close()

close(): Promise<void>

Defined in: dbos/DBOSChannel.d.ts:119

Close the channel (local only).

Note: DBOS stream lives in database, other servers may still use it.

Returns

Promise<void>

Implementation of

IChannel.close


getBuffer()

getBuffer(): readonly T[]

Defined in: dbos/DBOSChannel.d.ts:170

Get buffer for inspection (DBOSChannel has no in-memory buffer).

Returns empty array since data is in PostgreSQL, not memory.

Returns

readonly T[]

Implementation of

IChannel.getBuffer


getError()

getError(): Error | null

Defined in: dbos/DBOSChannel.d.ts:159

Retrieves any error that was set on the channel.

Returns

Error | null

The error if one was set, otherwise null

Example

typescript
for await (const value of channel) {
  // Process values
}

const error = channel.getError()
if (error) {
  console.error('Stream had error:', error.message)
}

Implementation of

IChannel.getError


getStats()

getStats(): ChannelStats

Defined in: dbos/DBOSChannel.d.ts:158

Returns channel statistics for monitoring and debugging.

Stats include:

  • Buffer size (in-memory implementations)
  • Subscriber count
  • Closed state
  • Implementation type
  • Implementation-specific metrics

Returns

ChannelStats

Channel statistics object

Examples

typescript
const stats = channel.getStats()
console.log(`Buffer: ${stats.bufferSize}, Subscribers: ${stats.subscriberCount}`)
typescript
const stats = channel.getStats()
if (stats.bufferSize > 1000) {
  await channel.send(value)  // Wait
} else {
  channel.send(value)  // Continue
}

Implementation of

IChannel.getStats


isChannelClosed()

isChannelClosed(): boolean

Defined in: dbos/DBOSChannel.d.ts:157

Checks whether the channel has been closed.

Returns

boolean

true if channel is closed, false otherwise

Example

typescript
if (!channel.isChannelClosed()) {
  await channel.send(value)
}

Implementation of

IChannel.isChannelClosed


send()

send(value): Promise<void>

Defined in: dbos/DBOSChannel.d.ts:109

Send value to DBOS stream (writes to PostgreSQL).

Pattern from ExecutionWorkflow.ts:196

Parameters

value

T

Returns

Promise<void>

Implementation of

IChannel.send


sendBatch()

sendBatch(values): Promise<void>

Defined in: dbos/DBOSChannel.d.ts:113

Send batch to DBOS stream.

Parameters

values

T[]

Returns

Promise<void>

Implementation of

IChannel.sendBatch


serialize()

serialize(): unknown

Defined in: dbos/DBOSChannel.d.ts:161

Serializes the channel state to a JSON-compatible object.

InMemoryChannel: Includes buffer and closed state DBOSChannel: Includes workflow ID, stream key, position

Used for:

  • Node cloning in flow graphs
  • Persisting execution state
  • Debugging and inspection

Note: Active subscribers are not serialized

Returns

unknown

JSON-compatible representation of channel state

Example

typescript
const state = channel.serialize()
await db.saveChannelState(nodeId, portId, state)

Implementation of

IChannel.serialize


setError()

setError(error): void

Defined in: dbos/DBOSChannel.d.ts:160

Sets an error state on the channel.

This will:

  • Store the error for later retrieval
  • Close the channel (no more sends allowed)
  • Notify all subscribers of closure

Parameters

error

Error

The error that occurred

Returns

void

Example

typescript
try {
  for await (const chunk of externalStream) {
    await channel.send(chunk)
  }
} catch (error) {
  channel.setError(error)
  // Channel automatically closed, subscribers notified
}

Implementation of

IChannel.setError


setLogger()

setLogger(logger): void

Defined in: dbos/DBOSChannel.d.ts:164

Attaches an external logger for persistence and observability.

Logger receives callbacks on:

  • onSend: Every value sent (for full traceability)
  • onPrune: When buffer items are cleaned up (memory management)

Enables:

  • Database persistence for replay capability
  • Metrics and monitoring
  • Audit trails
  • Debugging

Parameters

logger

ChannelLogger<T>

Logger implementation

Returns

void

Examples

typescript
const logger: ChannelLogger<Message> = {
  async onSend(value, position) {
    await db.insert(stream_logs).values({
      executionId,
      nodeId,
      portId,
      sequenceNumber: position,
      value: JSON.stringify(value),
      timestamp: new Date()
    })
  },
  async onPrune(items, reason) {
    console.log(`Pruned ${items.length} items (${reason})`)
  }
}

channel.setLogger(logger)

// All sends now logged to database
await channel.send({ data: 'important' })
typescript
channel.setLogger({
  onSend(value, position) {
    metrics.increment('channel.sends', 1)
    metrics.gauge('channel.position', position)
  },
  onPrune(items, reason) {
    metrics.increment('channel.prunes', items.length, { reason })
  }
})

Implementation of

IChannel.setLogger


subscribe()

subscribe(handler, onError?, onComplete?): () => void

Defined in: dbos/DBOSChannel.d.ts:125

Subscribe with callback (not implemented yet for DBOS).

TODO: Implement using background iterator

Parameters

handler

(value) => void | Promise<void>

onError?

(error) => void

onComplete?

() => void | Promise<void>

Returns

(): void

Returns

void

Implementation of

IChannel.subscribe


deserialize()

static deserialize<T>(data): DBOSChannel<T>

Defined in: dbos/DBOSChannel.d.ts:163

Type Parameters

T

T

Parameters

data

unknown

Returns

DBOSChannel<T>

Licensed under BUSL-1.1