ChainGraph API Documentation / @badaitech/chaingraph-channel / types / IChannel
Interface: IChannel<T>
Defined in: types/IChannel.d.ts:74
Universal channel interface for streaming data across different backends.
IChannel provides a unified abstraction for multi-subscriber async streaming that works with:
- InMemoryChannel: Fast in-memory buffering (browser + Node.js)
- DBOSChannel: PostgreSQL-backed distributed streaming (Node.js only)
- TRPCChannel: tRPC-based streaming over HTTP (future)
Design Philosophy:
- All send operations return Promise for consistency and composability
- Caller controls await strategy (fire-and-forget, immediate, batch, etc.)
- Multiple independent subscribers can consume at different rates
- Bounded buffers prevent memory leaks in long-running streams
- Serializable for persistence and node cloning
Key Features:
- Async iteration via
for await...of - Callback-based subscription via
subscribe() - External logging hooks for database persistence
- Error propagation to all subscribers
- Graceful shutdown with pending operation tracking
Examples
import type { IChannel } from '@badaitech/chaingraph-channel/types'
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'
const channel: IChannel<string> = new InMemoryChannel()
await channel.send('hello')
await channel.send('world')
await channel.close()
for await (const value of channel) {
console.log(value)
}import { DBOSChannel } from '@badaitech/chaingraph-channel/dbos'
// Server 1: Producer
const channel = new DBOSChannel('workflow-123', 'messages')
await channel.send({ type: 'data', value: 42 })
// Server 2: Consumer (receives data from Server 1!)
const channel = new DBOSChannel('workflow-123', 'messages')
for await (const msg of channel) {
console.log('Received from Server 1:', msg)
}const pending: Promise<void>[] = []
// High-throughput sending (don't wait)
for (const item of largeDataset) {
const promise = channel.send(item)
pending.push(promise)
}
// Graceful shutdown (wait for all pending)
await Promise.all(pending)
await channel.close()Type Parameters
T
T
The type of values transmitted through the channel
Properties
[asyncIterator]()
[asyncIterator]: () =>
AsyncIterableIterator<T[]>
Defined in: types/IChannel.d.ts:300
Returns an async iterator for consuming values (pull-style).
IMPORTANT: Yields BATCHES of values (T[]), not individual items.
Rationale:
- Database queries return 1000 items at once (PostgreSQL)
- 1000x performance improvement vs item-by-item iteration
- Matches existing DBOS stream implementation patterns
Enables for await...of loops. Each iterator creates an independent subscriber that maintains its own position in the stream.
The iterator will:
- Yield batches of buffered values
- Wait for new values if channel not closed
- Complete when channel closes and all values consumed
Returns
AsyncIterableIterator<T[]>
AsyncIterableIterator yielding BATCHES of channel values
Examples
for await (const batch of channel) {
for (const value of batch) {
console.log('Received:', value)
}
}async function* flatten<T>(channel: IChannel<T>): AsyncIterableIterator<T> {
for await (const batch of channel) {
for (const item of batch) {
yield item
}
}
}
for await (const value of flatten(channel)) {
console.log('Received:', value) // Single item API
}for await (const batch of channel) {
for (const value of batch) {
if (value === 'stop') {
break // Subscriber automatically cleaned up
}
processValue(value)
}
}// Both iterate independently
const consumer1 = (async () => {
for await (const batch of channel) {
await slowProcess(batch)
}
})()
const consumer2 = (async () => {
for await (const batch of channel) {
await fastProcess(batch)
}
})()
await Promise.all([consumer1, consumer2])clone()
clone: () =>
IChannel<T>
Defined in: types/IChannel.d.ts:432
Creates a deep copy of the channel with independent state.
The clone will:
- Have the same configuration
- Contain copied buffer values (not shared references)
- Be completely independent from the original
Used for:
- Node cloning in flow editor
- Creating execution instances from flow templates
Note: Subscribers and error state are not cloned
Returns
IChannel<T>
New independent channel instance
Example
const original = new InMemoryChannel<string>()
await original.send('data')
const clone = original.clone()
// Clone has independent state
// Modifying clone doesn't affect originalclose()
close: () =>
Promise<void>
Defined in: types/IChannel.d.ts:183
Closes the channel, preventing any new values from being sent.
Once closed:
- All active subscribers will be notified
- Subscribers will receive all buffered values before completion
- Attempting to send will throw an error
- Multiple close() calls are safe (idempotent)
The promise resolves when:
- All pending sends complete
- All subscribers are notified
- Cleanup operations finish
Returns
Promise<void>
Promise that resolves when channel is fully closed
Example
// Send some data
channel.send(value1)
channel.send(value2)
// Wait for sends to complete
await channel.close()
console.log('All pending sends completed, channel closed')getBuffer()?
optionalgetBuffer: () => readonlyT[]
Defined in: types/IChannel.d.ts:507
Returns readonly reference to internal buffer (for inspection/testing).
Implementation-specific: Only available on InMemoryChannel. DBOSChannel returns empty array (database-backed, no in-memory buffer).
Used for:
- Testing and debugging
- Inspecting buffered values
- Understanding channel state
Returns
readonly T[]
Readonly array of buffered values
Example
const buffer = channel.getBuffer?.()
if (buffer) {
console.log(`Buffer has ${buffer.length} items`)
}getError()
getError: () =>
Error|null
Defined in: types/IChannel.d.ts:360
Retrieves any error that was set on the channel.
Returns
Error | null
The error if one was set, otherwise null
Example
for await (const value of channel) {
// Process values
}
const error = channel.getError()
if (error) {
console.error('Stream had error:', error.message)
}getStats()
getStats: () =>
ChannelStats
Defined in: types/IChannel.d.ts:342
Returns channel statistics for monitoring and debugging.
Stats include:
- Buffer size (in-memory implementations)
- Subscriber count
- Closed state
- Implementation type
- Implementation-specific metrics
Returns
Channel statistics object
Examples
const stats = channel.getStats()
console.log(`Buffer: ${stats.bufferSize}, Subscribers: ${stats.subscriberCount}`)const stats = channel.getStats()
if (stats.bufferSize > 1000) {
await channel.send(value) // Wait
} else {
channel.send(value) // Continue
}isChannelClosed()
isChannelClosed: () =>
boolean
Defined in: types/IChannel.d.ts:313
Checks whether the channel has been closed.
Returns
boolean
true if channel is closed, false otherwise
Example
if (!channel.isChannelClosed()) {
await channel.send(value)
}send()
send: (
value) =>Promise<void>
Defined in: types/IChannel.d.ts:130
Sends a single value to the channel.
Always returns Promise to enable:
- Fire-and-forget:
channel.send(value)without await - Immediate wait:
await channel.send(value) - Batch parallel:
await Promise.all(items.map(channel.send)) - Graceful shutdown: Track pending sends
- Backpressure: Conditionally await based on buffer state
- Async loggers: Database writes, network calls, etc.
The promise resolves when:
- Value is added to buffer (or database for DBOSChannel)
- All subscribers are notified
- Logger callbacks complete (if attached)
- Any cleanup operations finish
Parameters
value
T
The value to send through the channel
Returns
Promise<void>
Promise that resolves when send operation completes
Throws
If the channel has been closed
Examples
// Don't wait - sends happen in background
channel.send(chunk1)
channel.send(chunk2)
channel.send(chunk3)
// Later: ensure all completed
await channel.close()await channel.send(criticalData)
console.log('Critical data confirmed sent')const sends = items.map(item => channel.send(item))
await Promise.all(sends)
console.log('All items sent')const stats = channel.getStats()
if (stats.bufferSize > 1000) {
await channel.send(value) // Wait when buffer full
} else {
channel.send(value) // Continue when buffer has space
}sendBatch()
sendBatch: (
values) =>Promise<void>
Defined in: types/IChannel.d.ts:154
Sends multiple values to the channel at once.
More efficient than calling send() multiple times because:
- Single buffer operation (memory implementation)
- Single transaction (database implementations)
- Single subscriber notification batch
- Reduced overhead
Parameters
values
T[]
Array of values to send
Returns
Promise<void>
Promise that resolves when all values are sent
Throws
If the channel has been closed
Examples
await channel.sendBatch([1, 2, 3, 4, 5])await channel.sendBatch([]) // No-op, returns immediatelyserialize()
serialize: () =>
unknown
Defined in: types/IChannel.d.ts:405
Serializes the channel state to a JSON-compatible object.
InMemoryChannel: Includes buffer and closed state DBOSChannel: Includes workflow ID, stream key, position
Used for:
- Node cloning in flow graphs
- Persisting execution state
- Debugging and inspection
Note: Active subscribers are not serialized
Returns
unknown
JSON-compatible representation of channel state
Example
const state = channel.serialize()
await db.saveChannelState(nodeId, portId, state)setError()
setError: (
error) =>void
Defined in: types/IChannel.d.ts:383
Sets an error state on the channel.
This will:
- Store the error for later retrieval
- Close the channel (no more sends allowed)
- Notify all subscribers of closure
Parameters
error
Error
The error that occurred
Returns
void
Example
try {
for await (const chunk of externalStream) {
await channel.send(chunk)
}
} catch (error) {
channel.setError(error)
// Channel automatically closed, subscribers notified
}setLogger()?
optionalsetLogger: (logger) =>void
Defined in: types/IChannel.d.ts:485
Attaches an external logger for persistence and observability.
Logger receives callbacks on:
onSend: Every value sent (for full traceability)onPrune: When buffer items are cleaned up (memory management)
Enables:
- Database persistence for replay capability
- Metrics and monitoring
- Audit trails
- Debugging
Parameters
logger
Logger implementation
Returns
void
Examples
const logger: ChannelLogger<Message> = {
async onSend(value, position) {
await db.insert(stream_logs).values({
executionId,
nodeId,
portId,
sequenceNumber: position,
value: JSON.stringify(value),
timestamp: new Date()
})
},
async onPrune(items, reason) {
console.log(`Pruned ${items.length} items (${reason})`)
}
}
channel.setLogger(logger)
// All sends now logged to database
await channel.send({ data: 'important' })channel.setLogger({
onSend(value, position) {
metrics.increment('channel.sends', 1)
metrics.gauge('channel.position', position)
},
onPrune(items, reason) {
metrics.increment('channel.prunes', items.length, { reason })
}
})subscribe()
subscribe: (
handler,onError?,onComplete?) => () =>void
Defined in: types/IChannel.d.ts:225
Subscribe to channel with callback-based consumption (push-style).
Each subscription is independent - subscribers maintain their own position and consume at their own rate. Useful for event-driven architectures.
Parameters
handler
(value) => void | Promise<void>
Called for each value received (can be async)
onError?
(error) => void
Called if an error occurs (optional)
onComplete?
() => void | Promise<void>
Called when channel closes (optional)
Returns
Unsubscribe function to stop receiving values
():
void
Returns
void
Examples
const unsubscribe = channel.subscribe((value) => {
console.log('Received:', value)
})
// Later: stop receiving
unsubscribe()channel.subscribe(
(value) => processValue(value),
(error) => console.error('Stream error:', error),
() => console.log('Stream complete')
)// Subscriber 1: logs to console
channel.subscribe(value => console.log(value))
// Subscriber 2: sends to metrics
channel.subscribe(value => metrics.track(value))
// Both receive all values independently