ChainGraph API Documentation / @badaitech/chaingraph-channel
@badaitech/chaingraph-channel
Unified channel abstractions for ChainGraph with multiple backend implementations.
Features
- 🌐 Universal Interface: Generic IChannel interface works across all implementations
- 💾 Multiple Backends: Memory (browser + Node), DBOS (distributed), tRPC (future)
- 🎯 Environment-Aware: Tree-shakeable exports for optimal bundle size
- 🔄 Async First: All operations return Promise for flexible await patterns
- 📊 Observable: Logger hooks for database persistence and metrics
- 🚀 High Performance: Bounded buffers, batch operations, minimal overhead
- 🧪 Well Tested: Comprehensive test suite with 90%+ coverage
Installation
Basic (Browser + Node.js)
npm install @badaitech/chaingraph-channelWith DBOS Support (Node.js Only)
npm install @badaitech/chaingraph-channel @dbos-inc/dbos-sdk pg pg-listenExports
| Export | Environment | Description | Bundle Size |
|---|---|---|---|
/types | Universal | Core types and interfaces | ~5 KB |
/memory | Universal | In-memory channel (browser + Node) | ~50 KB |
/dbos | Node only | PostgreSQL-backed distributed channel | ~500 KB |
Usage
Basic Usage (Browser or Node.js)
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'
const channel = new InMemoryChannel<string>({ maxBufferSize: 1000 })
// Send values
await channel.send('hello')
await channel.send('world')
await channel.close()
// Consume values
for await (const value of channel) {
console.log(value)
}Advanced: Fire-and-Forget Pattern
// High-throughput streaming (don't wait for each send)
const pending: Promise<void>[] = []
for (const item of largeDataset) {
const promise = channel.send(item)
pending.push(promise)
}
// Graceful shutdown: wait for all pending
await Promise.all(pending)
await channel.close()Distributed Execution (Node.js with DBOS)
import { DBOSChannel } from '@badaitech/chaingraph-channel/dbos'
// Server 1: Producer
const channel = new DBOSChannel('workflow-123', 'messages')
await channel.send({ type: 'data', value: 42 })
// Server 2: Consumer (receives from Server 1!)
const channel = new DBOSChannel('workflow-123', 'messages')
for await (const msg of channel) {
console.log('Received:', msg)
}Database Logging for Traceability
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'
const channel = new InMemoryChannel<Message>({ maxBufferSize: 1000 })
// Attach logger for blockchain-like traceability
channel.setLogger({
async onSend(value, position) {
await db.insert(stream_logs).values({
executionId,
nodeId,
portId,
sequenceNumber: position,
value: JSON.stringify(value),
timestamp: new Date()
})
},
async onPrune(items, reason) {
console.log(`Pruned ${items.length} items (${reason})`)
}
})
// All sends logged to database, memory stays bounded
for (const item of largeDataset) {
await channel.send(item)
}API Reference
IChannel
Core generic interface implemented by all channel types.
Producer Methods:
send(value)- Send single value (returns Promise)sendBatch(values)- Send multiple values (returns Promise)close()- Close channel (returns Promise)
Consumer Methods:
subscribe(handler, onError?, onComplete?)- Callback-based subscriptionSymbol.asyncIterator()- Async iterator for for-await loops
Monitoring:
getStats()- Get channel statisticsisChannelClosed()- Check if channel is closedgetError()- Get error state if anysetError(error)- Set error and close channel
Persistence:
serialize()- Serialize channel state to JSONclone()- Create deep copy of channelsetLogger(logger)- Attach logger for observability
Configuration
InMemoryChannel
new InMemoryChannel<T>({
maxBufferSize: 1000, // Buffer limit (0 = unbounded)
cleanupStrategy: 'min-position', // Cleanup strategy
maxBufferAge: 60000, // Max age for time-based cleanup
initialCapacity: 100 // Pre-allocate buffer
})DBOSChannel
new DBOSChannel<T>('workflow-id', 'stream-key', {
fromOffset: 0, // Start position
queryBatchSize: 1000, // DB query batch size
consumerBatchSize: 100, // Consumer batch size
batchTimeoutMs: 25, // Batch timeout
retry: {
maxAttempts: 3, // Retry attempts
initialDelay: 100, // Initial retry delay
backoffMultiplier: 2, // Exponential backoff
maxDelay: 5000 // Max retry delay
}
})Performance
Async Send Overhead
Modern JS engines optimize async functions extremely well:
1 million send() operations:
- Sync (old): about 10ms
- Async (new): about 12ms
- Overhead: about 0.000002ms per operation
Negligible for real-world usage!Buffer Management
- Bounded: Prevents memory leaks in long-running streams
- Smart Cleanup: Based on slowest subscriber position
- Batch Operations: Reduce notification overhead
- Pre-allocation: Optional initial capacity for performance
Why Always Return Promise?
Making send() always return Promise enables powerful patterns:
✅ Fire-and-forget: channel.send(value) without await ✅ Immediate wait: await channel.send(value) ✅ Batch parallel: await Promise.all(items.map(channel.send)) ✅ Graceful shutdown: Track pending sends ✅ Backpressure: Conditionally await based on buffer state ✅ Async loggers: Database writes, network calls, etc.
Migration from MultiChannel
// OLD: MultiChannel from chaingraph-types
import { MultiChannel } from '@badaitech/chaingraph-types'
const channel = new MultiChannel<string>()
channel.send(value) // void return
// NEW: InMemoryChannel from chaingraph-channel
import { InMemoryChannel } from '@badaitech/chaingraph-channel/memory'
const channel = new InMemoryChannel<string>()
await channel.send(value) // Promise<void> returnBreaking Changes:
send()now returns Promise instead of void (must await or handle Promise)- Buffer is bounded by default (was unbounded)
getBuffer()returns readonly arrayclone()deep copies buffer contents
Examples
See /examples directory for:
- LLM streaming
- Distributed pipeline
- Backpressure control
- Graceful shutdown
- Database logging
Testing
# Run all tests
pnpm test
# Test memory implementation only
pnpm test:memory
# Test DBOS implementation
pnpm test:dbos
# Coverage
pnpm test:coverageLicense
Business Source License 1.1 (BUSL-1.1)
Contributing
See main ChainGraph repository for contribution guidelines.