ChainGraph API Documentation / @badaitech/chaingraph-types / MultiChannel
Class: MultiChannel<T>
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:128
A type-safe, multi-subscriber asynchronous channel for streaming data.
MultiChannel implements a pub-sub pattern where multiple consumers can independently iterate over the same stream of values. Each subscriber maintains its own position in the buffer, allowing concurrent consumption at different rates.
Key Features:
- Async Iteration: Supports
for await...ofloops via Symbol.asyncIterator - Multiple Subscribers: Each subscriber reads independently with its own position
- Buffering: All sent values are stored, allowing late subscribers to read from the beginning
- Backpressure-free: No blocking - values are buffered until consumed
- Serializable: Can be serialized to JSON for persistence and restored later
- Error Handling: Supports error propagation to consumers
Common Use Cases:
- Streaming LLM responses in real-time to frontend (see: LLMCallNode)
- Broadcasting execution events to multiple listeners (see: DBOSEventBus)
- Bridging DBOS PostgreSQL streams to application logic (see: StreamBridge)
- Port-to-port streaming data in flow execution (see: StreamPort)
Examples
const channel = new MultiChannel<string>()
// Producer sends values
channel.send('hello')
channel.send('world')
channel.close()
// Consumer iterates
for await (const value of channel) {
console.log(value) // 'hello', 'world'
}const channel = new MultiChannel<number>()
// Start two subscribers concurrently
const consumer1 = (async () => {
for await (const value of channel) {
console.log('Consumer 1:', value)
}
})()
const consumer2 = (async () => {
for await (const value of channel) {
console.log('Consumer 2:', value)
}
})()
// Producer sends values (both consumers receive them)
channel.send(1)
channel.send(2)
channel.close()
await Promise.all([consumer1, consumer2])const channel = new MultiChannel<string>()
channel.sendBatch(['a', 'b', 'c'])
channel.close()
for await (const value of channel) {
console.log(value) // 'a', 'b', 'c'
}const channel = new MultiChannel<string>()
try {
for await (const value of channel) {
// Process value
}
// Check for errors after iteration completes
if (channel.getError()) {
console.error('Stream error:', channel.getError())
}
} catch (error) {
console.error('Iteration error:', error)
}const original = new MultiChannel<string>()
original.send('data')
original.close()
// Serialize to JSON
const json = original.serialize()
// Later, deserialize and restore
const restored = MultiChannel.deserialize(json)
console.log(restored.getBuffer()) // ['data']
console.log(restored.isChannelClosed()) // trueType Parameters
T
T
The type of values transmitted through the channel
Constructors
Constructor
new MultiChannel<
T>():MultiChannel<T>
Returns
MultiChannel<T>
Methods
[asyncIterator]()
[asyncIterator]():
AsyncIterableIterator<T>
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:304
Returns an async iterator for consuming values from the channel.
This enables the use of for await...of loops. Each call creates a new independent subscriber that starts reading from the beginning of the buffer. Multiple subscribers can iterate concurrently, each at their own pace.
The iterator will:
- Yield all values currently in the buffer
- Wait for new values if the channel is not yet closed
- Complete when the channel is closed and all values have been consumed
Returns
AsyncIterableIterator<T>
An async iterable iterator of channel values
Example
const channel = new MultiChannel<number>()
// Start consuming in background
const consumer = (async () => {
for await (const value of channel) {
console.log(value)
}
})()
// Producer sends values
channel.send(1)
channel.send(2)
channel.close()
await consumer // Wait for consumer to finishclone()
clone():
MultiChannel<T>
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:431
Creates a deep copy of the channel with all its buffered values and closed state.
The clone will contain the same values but will be a completely independent instance with no shared state. Subscribers and error state are not cloned.
Returns
MultiChannel<T>
A new MultiChannel instance with copied state
Example
const original = new MultiChannel<string>()
original.send('data')
original.close()
const copy = original.clone()
console.log(copy.getBuffer()) // ['data']
console.log(copy.isChannelClosed()) // trueclose()
close():
void
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:211
Closes the channel, preventing any new values from being sent.
Once closed, all active subscribers will be notified. Subscribers will receive all buffered values before their iterators complete. Attempting to send values after closing will throw an error.
This method is idempotent - calling it multiple times has no additional effect.
Returns
void
Example
const channel = new MultiChannel<string>()
channel.send('data')
channel.close()
// This will throw an error
channel.send('more data') // Error: Cannot send to a closed channelgetBuffer()
getBuffer(): readonly
T[]
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:379
Returns a direct reference to the internal buffer containing all sent values.
Warning: Modifying this array directly will affect the channel state. Use this for read-only inspection or when you need to access the raw buffer.
Returns
readonly T[]
Array containing all values sent through the channel
Example
const channel = new MultiChannel<number>()
channel.sendBatch([1, 2, 3])
const buffer = channel.getBuffer()
console.log(buffer) // [1, 2, 3]getError()
getError():
Error|null
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:267
Retrieves any error that was set on the channel.
Returns
Error | null
The error if one was set, otherwise null
Example
const error = channel.getError()
if (error) {
console.error('Stream error:', error.message)
}getSubscriberCount()
getSubscriberCount():
number
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:340
Returns the number of active subscribers currently consuming from the channel.
Useful for monitoring and debugging to see how many consumers are attached.
Returns
number
The count of active subscribers
Example
const channel = new MultiChannel<string>()
const iter1 = channel[Symbol.asyncIterator]()
const iter2 = channel[Symbol.asyncIterator]()
console.log(channel.getSubscriberCount()) // 2isChannelClosed()
isChannelClosed():
boolean
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:358
Checks whether the channel has been closed.
Returns
boolean
true if the channel is closed, false otherwise
Example
const channel = new MultiChannel<string>()
console.log(channel.isChannelClosed()) // false
channel.close()
console.log(channel.isChannelClosed()) // trueremoveSubscriber()
removeSubscriber(
subscriber):void
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:319
Internal
Removes a subscriber from the active subscribers set.
This is called automatically when a subscriber completes iteration or is explicitly closed. Typically, you don't need to call this manually.
Parameters
subscriber
Subscriber<T>
The subscriber to remove
Returns
void
send()
send(
value):void
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:157
Sends a single value to the channel.
The value is added to the internal buffer and all active subscribers are notified that new data is available.
Parameters
value
T
The value to send through the channel
Returns
void
Throws
If the channel has been closed
Example
const channel = new MultiChannel<string>()
channel.send('hello')
channel.send('world')sendBatch()
sendBatch(
values):void
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:182
Sends multiple values to the channel at once.
This is more efficient than calling send() multiple times as it adds all values to the buffer in one operation and notifies subscribers once.
Parameters
values
readonly T[]
Array of values to send through the channel
Returns
void
Throws
If the channel has been closed
Example
const channel = new MultiChannel<number>()
channel.sendBatch([1, 2, 3, 4, 5])serialize()
serialize():
unknown
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:405
Serializes the channel state to a JSON-compatible object.
The serialized representation includes:
buffer: All values currently in the channelisClosed: The closed state of the channel
Note: Active subscribers and error state are not serialized.
Returns
unknown
JSON-compatible object representing the channel state
Example
const channel = new MultiChannel<string>()
channel.send('data1')
channel.send('data2')
channel.close()
const json = channel.serialize()
// { buffer: ['data1', 'data2'], isClosed: true }setError()
setError(
error):void
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:250
Sets an error state on the channel.
This does not close the channel or stop iteration, but allows consumers to check if an error occurred during channel operations via getError(). Useful for signaling that data may be incomplete or invalid.
Parameters
error
Error
The error that occurred
Returns
void
Example
const channel = new MultiChannel<string>()
try {
// ... some operation that might fail
throw new Error('Processing failed')
} catch (err) {
channel.setError(err)
channel.close()
}
// Later, consumers can check
for await (const value of channel) {
// ...
}
if (channel.getError()) {
console.error('Channel had error:', channel.getError())
}deserialize()
staticdeserialize(value):MultiChannel<any>
Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:459
Deserializes a JSON object back into a MultiChannel instance.
The JSON object must conform to the MultiChannelSchema structure with buffer and isClosed properties.
Parameters
value
unknown
JSON-compatible object from serialize()
Returns
MultiChannel<any>
A new MultiChannel instance with restored state
Throws
If the input doesn't match the expected schema
Example
const json = { buffer: ['a', 'b', 'c'], isClosed: true }
const channel = MultiChannel.deserialize(json)
console.log(channel.getBuffer()) // ['a', 'b', 'c']
console.log(channel.isChannelClosed()) // true