ChainGraph API Documentation / @badaitech/chaingraph-channel / dbos / DBOSChannel
Class: DBOSChannel<T>
Defined in: dbos/DBOSChannel.d.ts:83
DBOS-backed distributed channel implementation.
REVISED: Based on actual DBOS SDK patterns from chaingraph-executor.
Features:
- ✅ Distributed across multiple servers via PostgreSQL
- ✅ Persistent (survives crashes and restarts)
- ✅ Real-time via PostgreSQL LISTEN/NOTIFY
- ✅ Two read modes: 'history' (with optional lastN) and 'realtime'
- ✅ Automatic deduplication between history and real-time
- ✅ Cross-execution communication
- ✅ Yields batches (T[]) for performance (1000x faster than item-by-item)
Architecture:
Server 1 → DBOSChannel.send() → DBOS.writeStream() → PostgreSQL
↓
LISTEN/NOTIFY
↓
Server 2 ← DBOSChannel[Symbol.asyncIterator] ← PostgreSQL SELECTBest For:
- Multi-server distributed execution
- Production deployments requiring HA
- Cross-execution communication
- Persistent streaming pipelines
Requirements:
- Node.js environment (not browser-compatible)
- DBOS SDK initialized
- PostgreSQL database connection
Examples
import { DBOSChannel } from '@badaitech/chaingraph-channel/dbos'
// Server 1: Producer
const channel = new DBOSChannel<Message>('workflow-123', 'messages', {
pgConnectionString: process.env.DATABASE_URL!,
queryPool: pool
})
await channel.send({ type: 'data', value: 42 })
// Server 2: Consumer (different server!)
const channel = new DBOSChannel<Message>('workflow-123', 'messages', {
pgConnectionString: process.env.DATABASE_URL!,
queryPool: pool
})
for await (const batch of channel) {
for (const msg of batch) {
console.log('Received from Server 1:', msg)
}
}const channel = new DBOSChannel('exec-123', 'events', {
pgConnectionString: process.env.DATABASE_URL!,
queryPool: pool,
readMode: 'history',
lastN: 50, // ← Read last 50 only
deduplication: true, // ← Prevent overlap
})
for await (const batch of channel) {
// First batches: Last 50 from history
// Then: Real-time updates as they arrive
console.log(batch)
}Type Parameters
T
T
The type of values transmitted through the channel
Implements
IChannel<T>
Constructors
Constructor
new DBOSChannel<
T>(workflowId,streamKey,config):DBOSChannel<T>
Defined in: dbos/DBOSChannel.d.ts:100
Parameters
workflowId
string
streamKey
string
config
Partial<Omit<DBOSChannelConfig, "workflowId" | "streamKey">> & object
Returns
DBOSChannel<T>
Methods
[asyncIterator]()
[asyncIterator]():
AsyncIterableIterator<T[]>
Defined in: dbos/DBOSChannel.d.ts:133
Async iteration over DBOS stream (YIELDS BATCHES).
SIMPLIFIED: Only 2 read modes:
- 'history': Read history (all or last N), then real-time
- 'realtime': Skip history, start from now
Returns
AsyncIterableIterator<T[]>
Implementation of
clone()
clone():
IChannel<T>
Defined in: dbos/DBOSChannel.d.ts:162
Creates a deep copy of the channel with independent state.
The clone will:
- Have the same configuration
- Contain copied buffer values (not shared references)
- Be completely independent from the original
Used for:
- Node cloning in flow editor
- Creating execution instances from flow templates
Note: Subscribers and error state are not cloned
Returns
IChannel<T>
New independent channel instance
Example
const original = new InMemoryChannel<string>()
await original.send('data')
const clone = original.clone()
// Clone has independent state
// Modifying clone doesn't affect originalImplementation of
close()
close():
Promise<void>
Defined in: dbos/DBOSChannel.d.ts:119
Close the channel (local only).
Note: DBOS stream lives in database, other servers may still use it.
Returns
Promise<void>
Implementation of
getBuffer()
getBuffer(): readonly
T[]
Defined in: dbos/DBOSChannel.d.ts:170
Get buffer for inspection (DBOSChannel has no in-memory buffer).
Returns empty array since data is in PostgreSQL, not memory.
Returns
readonly T[]
Implementation of
getError()
getError():
Error|null
Defined in: dbos/DBOSChannel.d.ts:159
Retrieves any error that was set on the channel.
Returns
Error | null
The error if one was set, otherwise null
Example
for await (const value of channel) {
// Process values
}
const error = channel.getError()
if (error) {
console.error('Stream had error:', error.message)
}Implementation of
getStats()
getStats():
ChannelStats
Defined in: dbos/DBOSChannel.d.ts:158
Returns channel statistics for monitoring and debugging.
Stats include:
- Buffer size (in-memory implementations)
- Subscriber count
- Closed state
- Implementation type
- Implementation-specific metrics
Returns
Channel statistics object
Examples
const stats = channel.getStats()
console.log(`Buffer: ${stats.bufferSize}, Subscribers: ${stats.subscriberCount}`)const stats = channel.getStats()
if (stats.bufferSize > 1000) {
await channel.send(value) // Wait
} else {
channel.send(value) // Continue
}Implementation of
isChannelClosed()
isChannelClosed():
boolean
Defined in: dbos/DBOSChannel.d.ts:157
Checks whether the channel has been closed.
Returns
boolean
true if channel is closed, false otherwise
Example
if (!channel.isChannelClosed()) {
await channel.send(value)
}Implementation of
send()
send(
value):Promise<void>
Defined in: dbos/DBOSChannel.d.ts:109
Send value to DBOS stream (writes to PostgreSQL).
Pattern from ExecutionWorkflow.ts:196
Parameters
value
T
Returns
Promise<void>
Implementation of
sendBatch()
sendBatch(
values):Promise<void>
Defined in: dbos/DBOSChannel.d.ts:113
Send batch to DBOS stream.
Parameters
values
T[]
Returns
Promise<void>
Implementation of
serialize()
serialize():
unknown
Defined in: dbos/DBOSChannel.d.ts:161
Serializes the channel state to a JSON-compatible object.
InMemoryChannel: Includes buffer and closed state DBOSChannel: Includes workflow ID, stream key, position
Used for:
- Node cloning in flow graphs
- Persisting execution state
- Debugging and inspection
Note: Active subscribers are not serialized
Returns
unknown
JSON-compatible representation of channel state
Example
const state = channel.serialize()
await db.saveChannelState(nodeId, portId, state)Implementation of
setError()
setError(
error):void
Defined in: dbos/DBOSChannel.d.ts:160
Sets an error state on the channel.
This will:
- Store the error for later retrieval
- Close the channel (no more sends allowed)
- Notify all subscribers of closure
Parameters
error
Error
The error that occurred
Returns
void
Example
try {
for await (const chunk of externalStream) {
await channel.send(chunk)
}
} catch (error) {
channel.setError(error)
// Channel automatically closed, subscribers notified
}Implementation of
setLogger()
setLogger(
logger):void
Defined in: dbos/DBOSChannel.d.ts:164
Attaches an external logger for persistence and observability.
Logger receives callbacks on:
onSend: Every value sent (for full traceability)onPrune: When buffer items are cleaned up (memory management)
Enables:
- Database persistence for replay capability
- Metrics and monitoring
- Audit trails
- Debugging
Parameters
logger
Logger implementation
Returns
void
Examples
const logger: ChannelLogger<Message> = {
async onSend(value, position) {
await db.insert(stream_logs).values({
executionId,
nodeId,
portId,
sequenceNumber: position,
value: JSON.stringify(value),
timestamp: new Date()
})
},
async onPrune(items, reason) {
console.log(`Pruned ${items.length} items (${reason})`)
}
}
channel.setLogger(logger)
// All sends now logged to database
await channel.send({ data: 'important' })channel.setLogger({
onSend(value, position) {
metrics.increment('channel.sends', 1)
metrics.gauge('channel.position', position)
},
onPrune(items, reason) {
metrics.increment('channel.prunes', items.length, { reason })
}
})Implementation of
subscribe()
subscribe(
handler,onError?,onComplete?): () =>void
Defined in: dbos/DBOSChannel.d.ts:125
Subscribe with callback (not implemented yet for DBOS).
TODO: Implement using background iterator
Parameters
handler
(value) => void | Promise<void>
onError?
(error) => void
onComplete?
() => void | Promise<void>
Returns
():
void
Returns
void
Implementation of
deserialize()
staticdeserialize<T>(data):DBOSChannel<T>
Defined in: dbos/DBOSChannel.d.ts:163
Type Parameters
T
T
Parameters
data
unknown
Returns
DBOSChannel<T>