Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-channel / types / IChannel

Interface: IChannel<T>

Defined in: types/IChannel.d.ts:74

Universal channel interface for streaming data across different backends.

IChannel provides a unified abstraction for multi-subscriber async streaming that works with:

  • InMemoryChannel: Fast in-memory buffering (browser + Node.js)
  • DBOSChannel: PostgreSQL-backed distributed streaming (Node.js only)
  • TRPCChannel: tRPC-based streaming over HTTP (future)

Design Philosophy:

  • All send operations return Promise for consistency and composability
  • Caller controls await strategy (fire-and-forget, immediate, batch, etc.)
  • Multiple independent subscribers can consume at different rates
  • Bounded buffers prevent memory leaks in long-running streams
  • Serializable for persistence and node cloning

Key Features:

  • Async iteration via for await...of
  • Callback-based subscription via subscribe()
  • External logging hooks for database persistence
  • Error propagation to all subscribers
  • Graceful shutdown with pending operation tracking

Examples

typescript
import type { IChannel } from '@badaitech/chaingraph-channel/types'
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'

const channel: IChannel<string> = new InMemoryChannel()

await channel.send('hello')
await channel.send('world')
await channel.close()

for await (const value of channel) {
  console.log(value)
}
typescript
import { DBOSChannel } from '@badaitech/chaingraph-channel/dbos'

// Server 1: Producer
const channel = new DBOSChannel('workflow-123', 'messages')
await channel.send({ type: 'data', value: 42 })

// Server 2: Consumer (receives data from Server 1!)
const channel = new DBOSChannel('workflow-123', 'messages')
for await (const msg of channel) {
  console.log('Received from Server 1:', msg)
}
typescript
const pending: Promise<void>[] = []

// High-throughput sending (don't wait)
for (const item of largeDataset) {
  const promise = channel.send(item)
  pending.push(promise)
}

// Graceful shutdown (wait for all pending)
await Promise.all(pending)
await channel.close()

Type Parameters

T

T

The type of values transmitted through the channel

Properties

[asyncIterator]()

[asyncIterator]: () => AsyncIterableIterator<T[]>

Defined in: types/IChannel.d.ts:300

Returns an async iterator for consuming values (pull-style).

IMPORTANT: Yields BATCHES of values (T[]), not individual items.

Rationale:

  • Database queries return 1000 items at once (PostgreSQL)
  • 1000x performance improvement vs item-by-item iteration
  • Matches existing DBOS stream implementation patterns

Enables for await...of loops. Each iterator creates an independent subscriber that maintains its own position in the stream.

The iterator will:

  • Yield batches of buffered values
  • Wait for new values if channel not closed
  • Complete when channel closes and all values consumed

Returns

AsyncIterableIterator<T[]>

AsyncIterableIterator yielding BATCHES of channel values

Examples

typescript
for await (const batch of channel) {
  for (const value of batch) {
    console.log('Received:', value)
  }
}
typescript
async function* flatten<T>(channel: IChannel<T>): AsyncIterableIterator<T> {
  for await (const batch of channel) {
    for (const item of batch) {
      yield item
    }
  }
}

for await (const value of flatten(channel)) {
  console.log('Received:', value) // Single item API
}
typescript
for await (const batch of channel) {
  for (const value of batch) {
    if (value === 'stop') {
      break  // Subscriber automatically cleaned up
    }
    processValue(value)
  }
}
typescript
// Both iterate independently
const consumer1 = (async () => {
  for await (const batch of channel) {
    await slowProcess(batch)
  }
})()

const consumer2 = (async () => {
  for await (const batch of channel) {
    await fastProcess(batch)
  }
})()

await Promise.all([consumer1, consumer2])

clone()

clone: () => IChannel<T>

Defined in: types/IChannel.d.ts:432

Creates a deep copy of the channel with independent state.

The clone will:

  • Have the same configuration
  • Contain copied buffer values (not shared references)
  • Be completely independent from the original

Used for:

  • Node cloning in flow editor
  • Creating execution instances from flow templates

Note: Subscribers and error state are not cloned

Returns

IChannel<T>

New independent channel instance

Example

typescript
const original = new InMemoryChannel<string>()
await original.send('data')

const clone = original.clone()
// Clone has independent state
// Modifying clone doesn't affect original

close()

close: () => Promise<void>

Defined in: types/IChannel.d.ts:183

Closes the channel, preventing any new values from being sent.

Once closed:

  • All active subscribers will be notified
  • Subscribers will receive all buffered values before completion
  • Attempting to send will throw an error
  • Multiple close() calls are safe (idempotent)

The promise resolves when:

  • All pending sends complete
  • All subscribers are notified
  • Cleanup operations finish

Returns

Promise<void>

Promise that resolves when channel is fully closed

Example

typescript
// Send some data
channel.send(value1)
channel.send(value2)

// Wait for sends to complete
await channel.close()

console.log('All pending sends completed, channel closed')

getBuffer()?

optional getBuffer: () => readonly T[]

Defined in: types/IChannel.d.ts:507

Returns readonly reference to internal buffer (for inspection/testing).

Implementation-specific: Only available on InMemoryChannel. DBOSChannel returns empty array (database-backed, no in-memory buffer).

Used for:

  • Testing and debugging
  • Inspecting buffered values
  • Understanding channel state

Returns

readonly T[]

Readonly array of buffered values

Example

typescript
const buffer = channel.getBuffer?.()
if (buffer) {
  console.log(`Buffer has ${buffer.length} items`)
}

getError()

getError: () => Error | null

Defined in: types/IChannel.d.ts:360

Retrieves any error that was set on the channel.

Returns

Error | null

The error if one was set, otherwise null

Example

typescript
for await (const value of channel) {
  // Process values
}

const error = channel.getError()
if (error) {
  console.error('Stream had error:', error.message)
}

getStats()

getStats: () => ChannelStats

Defined in: types/IChannel.d.ts:342

Returns channel statistics for monitoring and debugging.

Stats include:

  • Buffer size (in-memory implementations)
  • Subscriber count
  • Closed state
  • Implementation type
  • Implementation-specific metrics

Returns

ChannelStats

Channel statistics object

Examples

typescript
const stats = channel.getStats()
console.log(`Buffer: ${stats.bufferSize}, Subscribers: ${stats.subscriberCount}`)
typescript
const stats = channel.getStats()
if (stats.bufferSize > 1000) {
  await channel.send(value)  // Wait
} else {
  channel.send(value)  // Continue
}

isChannelClosed()

isChannelClosed: () => boolean

Defined in: types/IChannel.d.ts:313

Checks whether the channel has been closed.

Returns

boolean

true if channel is closed, false otherwise

Example

typescript
if (!channel.isChannelClosed()) {
  await channel.send(value)
}

send()

send: (value) => Promise<void>

Defined in: types/IChannel.d.ts:130

Sends a single value to the channel.

Always returns Promise to enable:

  • Fire-and-forget: channel.send(value) without await
  • Immediate wait: await channel.send(value)
  • Batch parallel: await Promise.all(items.map(channel.send))
  • Graceful shutdown: Track pending sends
  • Backpressure: Conditionally await based on buffer state
  • Async loggers: Database writes, network calls, etc.

The promise resolves when:

  • Value is added to buffer (or database for DBOSChannel)
  • All subscribers are notified
  • Logger callbacks complete (if attached)
  • Any cleanup operations finish

Parameters

value

T

The value to send through the channel

Returns

Promise<void>

Promise that resolves when send operation completes

Throws

If the channel has been closed

Examples

typescript
// Don't wait - sends happen in background
channel.send(chunk1)
channel.send(chunk2)
channel.send(chunk3)

// Later: ensure all completed
await channel.close()
typescript
await channel.send(criticalData)
console.log('Critical data confirmed sent')
typescript
const sends = items.map(item => channel.send(item))
await Promise.all(sends)
console.log('All items sent')
typescript
const stats = channel.getStats()
if (stats.bufferSize > 1000) {
  await channel.send(value)  // Wait when buffer full
} else {
  channel.send(value)  // Continue when buffer has space
}

sendBatch()

sendBatch: (values) => Promise<void>

Defined in: types/IChannel.d.ts:154

Sends multiple values to the channel at once.

More efficient than calling send() multiple times because:

  • Single buffer operation (memory implementation)
  • Single transaction (database implementations)
  • Single subscriber notification batch
  • Reduced overhead

Parameters

values

T[]

Array of values to send

Returns

Promise<void>

Promise that resolves when all values are sent

Throws

If the channel has been closed

Examples

typescript
await channel.sendBatch([1, 2, 3, 4, 5])
typescript
await channel.sendBatch([])  // No-op, returns immediately

serialize()

serialize: () => unknown

Defined in: types/IChannel.d.ts:405

Serializes the channel state to a JSON-compatible object.

InMemoryChannel: Includes buffer and closed state DBOSChannel: Includes workflow ID, stream key, position

Used for:

  • Node cloning in flow graphs
  • Persisting execution state
  • Debugging and inspection

Note: Active subscribers are not serialized

Returns

unknown

JSON-compatible representation of channel state

Example

typescript
const state = channel.serialize()
await db.saveChannelState(nodeId, portId, state)

setError()

setError: (error) => void

Defined in: types/IChannel.d.ts:383

Sets an error state on the channel.

This will:

  • Store the error for later retrieval
  • Close the channel (no more sends allowed)
  • Notify all subscribers of closure

Parameters

error

Error

The error that occurred

Returns

void

Example

typescript
try {
  for await (const chunk of externalStream) {
    await channel.send(chunk)
  }
} catch (error) {
  channel.setError(error)
  // Channel automatically closed, subscribers notified
}

setLogger()?

optional setLogger: (logger) => void

Defined in: types/IChannel.d.ts:485

Attaches an external logger for persistence and observability.

Logger receives callbacks on:

  • onSend: Every value sent (for full traceability)
  • onPrune: When buffer items are cleaned up (memory management)

Enables:

  • Database persistence for replay capability
  • Metrics and monitoring
  • Audit trails
  • Debugging

Parameters

logger

ChannelLogger<T>

Logger implementation

Returns

void

Examples

typescript
const logger: ChannelLogger<Message> = {
  async onSend(value, position) {
    await db.insert(stream_logs).values({
      executionId,
      nodeId,
      portId,
      sequenceNumber: position,
      value: JSON.stringify(value),
      timestamp: new Date()
    })
  },
  async onPrune(items, reason) {
    console.log(`Pruned ${items.length} items (${reason})`)
  }
}

channel.setLogger(logger)

// All sends now logged to database
await channel.send({ data: 'important' })
typescript
channel.setLogger({
  onSend(value, position) {
    metrics.increment('channel.sends', 1)
    metrics.gauge('channel.position', position)
  },
  onPrune(items, reason) {
    metrics.increment('channel.prunes', items.length, { reason })
  }
})

subscribe()

subscribe: (handler, onError?, onComplete?) => () => void

Defined in: types/IChannel.d.ts:225

Subscribe to channel with callback-based consumption (push-style).

Each subscription is independent - subscribers maintain their own position and consume at their own rate. Useful for event-driven architectures.

Parameters

handler

(value) => void | Promise<void>

Called for each value received (can be async)

onError?

(error) => void

Called if an error occurs (optional)

onComplete?

() => void | Promise<void>

Called when channel closes (optional)

Returns

Unsubscribe function to stop receiving values

(): void

Returns

void

Examples

typescript
const unsubscribe = channel.subscribe((value) => {
  console.log('Received:', value)
})

// Later: stop receiving
unsubscribe()
typescript
channel.subscribe(
  (value) => processValue(value),
  (error) => console.error('Stream error:', error),
  () => console.log('Stream complete')
)
typescript
// Subscriber 1: logs to console
channel.subscribe(value => console.log(value))

// Subscriber 2: sends to metrics
channel.subscribe(value => metrics.track(value))

// Both receive all values independently

Licensed under BUSL-1.1