Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-channel

@badaitech/chaingraph-channel

Unified channel abstractions for ChainGraph with multiple backend implementations.

Features

  • 🌐 Universal Interface: Generic IChannel interface works across all implementations
  • 💾 Multiple Backends: Memory (browser + Node), DBOS (distributed), tRPC (future)
  • 🎯 Environment-Aware: Tree-shakeable exports for optimal bundle size
  • 🔄 Async First: All operations return Promise for flexible await patterns
  • 📊 Observable: Logger hooks for database persistence and metrics
  • 🚀 High Performance: Bounded buffers, batch operations, minimal overhead
  • 🧪 Well Tested: Comprehensive test suite with 90%+ coverage

Installation

Basic (Browser + Node.js)

bash
npm install @badaitech/chaingraph-channel

With DBOS Support (Node.js Only)

bash
npm install @badaitech/chaingraph-channel @dbos-inc/dbos-sdk pg pg-listen

Exports

ExportEnvironmentDescriptionBundle Size
/typesUniversalCore types and interfaces~5 KB
/memoryUniversalIn-memory channel (browser + Node)~50 KB
/dbosNode onlyPostgreSQL-backed distributed channel~500 KB

Usage

Basic Usage (Browser or Node.js)

typescript
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'

const channel = new InMemoryChannel<string>({ maxBufferSize: 1000 })

// Send values
await channel.send('hello')
await channel.send('world')
await channel.close()

// Consume values
for await (const value of channel) {
  console.log(value)
}

Advanced: Fire-and-Forget Pattern

typescript
// High-throughput streaming (don't wait for each send)
const pending: Promise<void>[] = []

for (const item of largeDataset) {
  const promise = channel.send(item)
  pending.push(promise)
}

// Graceful shutdown: wait for all pending
await Promise.all(pending)
await channel.close()

Distributed Execution (Node.js with DBOS)

typescript
import { DBOSChannel } from '@badaitech/chaingraph-channel/dbos'

// Server 1: Producer
const channel = new DBOSChannel('workflow-123', 'messages')
await channel.send({ type: 'data', value: 42 })

// Server 2: Consumer (receives from Server 1!)
const channel = new DBOSChannel('workflow-123', 'messages')
for await (const msg of channel) {
  console.log('Received:', msg)
}

Database Logging for Traceability

typescript
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'

const channel = new InMemoryChannel<Message>({ maxBufferSize: 1000 })

// Attach logger for blockchain-like traceability
channel.setLogger({
  async onSend(value, position) {
    await db.insert(stream_logs).values({
      executionId,
      nodeId,
      portId,
      sequenceNumber: position,
      value: JSON.stringify(value),
      timestamp: new Date()
    })
  },
  async onPrune(items, reason) {
    console.log(`Pruned ${items.length} items (${reason})`)
  }
})

// All sends logged to database, memory stays bounded
for (const item of largeDataset) {
  await channel.send(item)
}

API Reference

IChannel

Core generic interface implemented by all channel types.

Producer Methods:

  • send(value) - Send single value (returns Promise)
  • sendBatch(values) - Send multiple values (returns Promise)
  • close() - Close channel (returns Promise)

Consumer Methods:

  • subscribe(handler, onError?, onComplete?) - Callback-based subscription
  • Symbol.asyncIterator() - Async iterator for for-await loops

Monitoring:

  • getStats() - Get channel statistics
  • isChannelClosed() - Check if channel is closed
  • getError() - Get error state if any
  • setError(error) - Set error and close channel

Persistence:

  • serialize() - Serialize channel state to JSON
  • clone() - Create deep copy of channel
  • setLogger(logger) - Attach logger for observability

Configuration

InMemoryChannel

typescript
new InMemoryChannel<T>({
  maxBufferSize: 1000,          // Buffer limit (0 = unbounded)
  cleanupStrategy: 'min-position',  // Cleanup strategy
  maxBufferAge: 60000,          // Max age for time-based cleanup
  initialCapacity: 100          // Pre-allocate buffer
})

DBOSChannel

typescript
new DBOSChannel<T>('workflow-id', 'stream-key', {
  fromOffset: 0,                // Start position
  queryBatchSize: 1000,         // DB query batch size
  consumerBatchSize: 100,       // Consumer batch size
  batchTimeoutMs: 25,           // Batch timeout
  retry: {
    maxAttempts: 3,             // Retry attempts
    initialDelay: 100,          // Initial retry delay
    backoffMultiplier: 2,       // Exponential backoff
    maxDelay: 5000              // Max retry delay
  }
})

Performance

Async Send Overhead

Modern JS engines optimize async functions extremely well:

1 million send() operations:
- Sync (old):   about 10ms
- Async (new):  about 12ms
- Overhead:     about 0.000002ms per operation

Negligible for real-world usage!

Buffer Management

  • Bounded: Prevents memory leaks in long-running streams
  • Smart Cleanup: Based on slowest subscriber position
  • Batch Operations: Reduce notification overhead
  • Pre-allocation: Optional initial capacity for performance

Why Always Return Promise?

Making send() always return Promise enables powerful patterns:

Fire-and-forget: channel.send(value) without await ✅ Immediate wait: await channel.send(value)Batch parallel: await Promise.all(items.map(channel.send))Graceful shutdown: Track pending sends ✅ Backpressure: Conditionally await based on buffer state ✅ Async loggers: Database writes, network calls, etc.


Migration from MultiChannel

typescript
// OLD: MultiChannel from chaingraph-types
import { MultiChannel } from '@badaitech/chaingraph-types'
const channel = new MultiChannel<string>()
channel.send(value)  // void return

// NEW: InMemoryChannel from chaingraph-channel
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'
const channel = new InMemoryChannel<string>()
await channel.send(value)  // Promise<void> return

Breaking Changes:

  • send() now returns Promise instead of void (must await or handle Promise)
  • Buffer is bounded by default (was unbounded)
  • getBuffer() returns readonly array
  • clone() deep copies buffer contents

Examples

See /examples directory for:

  • LLM streaming
  • Distributed pipeline
  • Backpressure control
  • Graceful shutdown
  • Database logging

Testing

bash
# Run all tests
pnpm test

# Test memory implementation only
pnpm test:memory

# Test DBOS implementation
pnpm test:dbos

# Coverage
pnpm test:coverage

License

Business Source License 1.1 (BUSL-1.1)


Contributing

See main ChainGraph repository for contribution guidelines.

Modules

Licensed under BUSL-1.1