Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-types / MultiChannel

Class: MultiChannel<T>

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:128

A type-safe, multi-subscriber asynchronous channel for streaming data.

MultiChannel implements a pub-sub pattern where multiple consumers can independently iterate over the same stream of values. Each subscriber maintains its own position in the buffer, allowing concurrent consumption at different rates.

Key Features:

  • Async Iteration: Supports for await...of loops via Symbol.asyncIterator
  • Multiple Subscribers: Each subscriber reads independently with its own position
  • Buffering: All sent values are stored, allowing late subscribers to read from the beginning
  • Backpressure-free: No blocking - values are buffered until consumed
  • Serializable: Can be serialized to JSON for persistence and restored later
  • Error Handling: Supports error propagation to consumers

Common Use Cases:

  • Streaming LLM responses in real-time to frontend (see: LLMCallNode)
  • Broadcasting execution events to multiple listeners (see: DBOSEventBus)
  • Bridging DBOS PostgreSQL streams to application logic (see: StreamBridge)
  • Port-to-port streaming data in flow execution (see: StreamPort)

Examples

typescript
const channel = new MultiChannel<string>()

// Producer sends values
channel.send('hello')
channel.send('world')
channel.close()

// Consumer iterates
for await (const value of channel) {
  console.log(value) // 'hello', 'world'
}
typescript
const channel = new MultiChannel<number>()

// Start two subscribers concurrently
const consumer1 = (async () => {
  for await (const value of channel) {
    console.log('Consumer 1:', value)
  }
})()

const consumer2 = (async () => {
  for await (const value of channel) {
    console.log('Consumer 2:', value)
  }
})()

// Producer sends values (both consumers receive them)
channel.send(1)
channel.send(2)
channel.close()

await Promise.all([consumer1, consumer2])
typescript
const channel = new MultiChannel<string>()

channel.sendBatch(['a', 'b', 'c'])
channel.close()

for await (const value of channel) {
  console.log(value) // 'a', 'b', 'c'
}
typescript
const channel = new MultiChannel<string>()

try {
  for await (const value of channel) {
    // Process value
  }
  // Check for errors after iteration completes
  if (channel.getError()) {
    console.error('Stream error:', channel.getError())
  }
} catch (error) {
  console.error('Iteration error:', error)
}
typescript
const original = new MultiChannel<string>()
original.send('data')
original.close()

// Serialize to JSON
const json = original.serialize()

// Later, deserialize and restore
const restored = MultiChannel.deserialize(json)
console.log(restored.getBuffer()) // ['data']
console.log(restored.isChannelClosed()) // true

Type Parameters

T

T

The type of values transmitted through the channel

Constructors

Constructor

new MultiChannel<T>(): MultiChannel<T>

Returns

MultiChannel<T>

Methods

[asyncIterator]()

[asyncIterator](): AsyncIterableIterator<T>

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:304

Returns an async iterator for consuming values from the channel.

This enables the use of for await...of loops. Each call creates a new independent subscriber that starts reading from the beginning of the buffer. Multiple subscribers can iterate concurrently, each at their own pace.

The iterator will:

  • Yield all values currently in the buffer
  • Wait for new values if the channel is not yet closed
  • Complete when the channel is closed and all values have been consumed

Returns

AsyncIterableIterator<T>

An async iterable iterator of channel values

Example

typescript
const channel = new MultiChannel<number>()

// Start consuming in background
const consumer = (async () => {
  for await (const value of channel) {
    console.log(value)
  }
})()

// Producer sends values
channel.send(1)
channel.send(2)
channel.close()

await consumer // Wait for consumer to finish

clone()

clone(): MultiChannel<T>

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:431

Creates a deep copy of the channel with all its buffered values and closed state.

The clone will contain the same values but will be a completely independent instance with no shared state. Subscribers and error state are not cloned.

Returns

MultiChannel<T>

A new MultiChannel instance with copied state

Example

typescript
const original = new MultiChannel<string>()
original.send('data')
original.close()

const copy = original.clone()
console.log(copy.getBuffer()) // ['data']
console.log(copy.isChannelClosed()) // true

close()

close(): void

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:211

Closes the channel, preventing any new values from being sent.

Once closed, all active subscribers will be notified. Subscribers will receive all buffered values before their iterators complete. Attempting to send values after closing will throw an error.

This method is idempotent - calling it multiple times has no additional effect.

Returns

void

Example

typescript
const channel = new MultiChannel<string>()
channel.send('data')
channel.close()

// This will throw an error
channel.send('more data') // Error: Cannot send to a closed channel

getBuffer()

getBuffer(): readonly T[]

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:379

Returns a direct reference to the internal buffer containing all sent values.

Warning: Modifying this array directly will affect the channel state. Use this for read-only inspection or when you need to access the raw buffer.

Returns

readonly T[]

Array containing all values sent through the channel

Example

typescript
const channel = new MultiChannel<number>()
channel.sendBatch([1, 2, 3])

const buffer = channel.getBuffer()
console.log(buffer) // [1, 2, 3]

getError()

getError(): Error | null

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:267

Retrieves any error that was set on the channel.

Returns

Error | null

The error if one was set, otherwise null

Example

typescript
const error = channel.getError()
if (error) {
  console.error('Stream error:', error.message)
}

getSubscriberCount()

getSubscriberCount(): number

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:340

Returns the number of active subscribers currently consuming from the channel.

Useful for monitoring and debugging to see how many consumers are attached.

Returns

number

The count of active subscribers

Example

typescript
const channel = new MultiChannel<string>()

const iter1 = channel[Symbol.asyncIterator]()
const iter2 = channel[Symbol.asyncIterator]()

console.log(channel.getSubscriberCount()) // 2

isChannelClosed()

isChannelClosed(): boolean

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:358

Checks whether the channel has been closed.

Returns

boolean

true if the channel is closed, false otherwise

Example

typescript
const channel = new MultiChannel<string>()
console.log(channel.isChannelClosed()) // false

channel.close()
console.log(channel.isChannelClosed()) // true

removeSubscriber()

removeSubscriber(subscriber): void

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:319

Internal

Removes a subscriber from the active subscribers set.

This is called automatically when a subscriber completes iteration or is explicitly closed. Typically, you don't need to call this manually.

Parameters

subscriber

Subscriber<T>

The subscriber to remove

Returns

void


send()

send(value): void

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:157

Sends a single value to the channel.

The value is added to the internal buffer and all active subscribers are notified that new data is available.

Parameters

value

T

The value to send through the channel

Returns

void

Throws

If the channel has been closed

Example

typescript
const channel = new MultiChannel<string>()
channel.send('hello')
channel.send('world')

sendBatch()

sendBatch(values): void

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:182

Sends multiple values to the channel at once.

This is more efficient than calling send() multiple times as it adds all values to the buffer in one operation and notifies subscribers once.

Parameters

values

readonly T[]

Array of values to send through the channel

Returns

void

Throws

If the channel has been closed

Example

typescript
const channel = new MultiChannel<number>()
channel.sendBatch([1, 2, 3, 4, 5])

serialize()

serialize(): unknown

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:405

Serializes the channel state to a JSON-compatible object.

The serialized representation includes:

  • buffer: All values currently in the channel
  • isClosed: The closed state of the channel

Note: Active subscribers and error state are not serialized.

Returns

unknown

JSON-compatible object representing the channel state

Example

typescript
const channel = new MultiChannel<string>()
channel.send('data1')
channel.send('data2')
channel.close()

const json = channel.serialize()
// { buffer: ['data1', 'data2'], isClosed: true }

setError()

setError(error): void

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:250

Sets an error state on the channel.

This does not close the channel or stop iteration, but allows consumers to check if an error occurred during channel operations via getError(). Useful for signaling that data may be incomplete or invalid.

Parameters

error

Error

The error that occurred

Returns

void

Example

typescript
const channel = new MultiChannel<string>()

try {
  // ... some operation that might fail
  throw new Error('Processing failed')
} catch (err) {
  channel.setError(err)
  channel.close()
}

// Later, consumers can check
for await (const value of channel) {
  // ...
}
if (channel.getError()) {
  console.error('Channel had error:', channel.getError())
}

deserialize()

static deserialize(value): MultiChannel<any>

Defined in: packages/chaingraph-types/src/utils/multi-channel.ts:459

Deserializes a JSON object back into a MultiChannel instance.

The JSON object must conform to the MultiChannelSchema structure with buffer and isClosed properties.

Parameters

value

unknown

JSON-compatible object from serialize()

Returns

MultiChannel<any>

A new MultiChannel instance with restored state

Throws

If the input doesn't match the expected schema

Example

typescript
const json = { buffer: ['a', 'b', 'c'], isClosed: true }
const channel = MultiChannel.deserialize(json)

console.log(channel.getBuffer()) // ['a', 'b', 'c']
console.log(channel.isChannelClosed()) // true

Licensed under BUSL-1.1