ChainGraph API Documentation / @badaitech/chaingraph-executor / server / DBOSEventBus
Class: DBOSEventBus
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:31
DBOS-based implementation of IEventBus
Simplified wrapper around StreamBridge for execution event streaming. Uses generic DBOS stream infrastructure with MultiChannel pattern.
Features:
- Real-time streaming via PostgreSQL LISTEN/NOTIFY
- Automatic sharding across PGListener pool
- MultiChannel for efficient fan-out
- Backward compatible with IEventBus interface
Implements
Constructors
Constructor
new DBOSEventBus(
streamBridge):DBOSEventBus
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:35
Parameters
streamBridge
StreamBridge
Returns
DBOSEventBus
Methods
close()
close():
Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:120
Close the event bus
Returns
Promise<void>
Implementation of
closeStream()
closeStream(
executionId):Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:130
Close stream for specific execution (no-op)
DBOS automatically closes streams when workflow terminates
Parameters
executionId
string
Returns
Promise<void>
publishEvent()
publishEvent(
executionId,event):Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:49
Publish an execution event to DBOS stream
Note: This continues to use DBOS.writeStream() directly to maintain existing execution flow (no changes to ExecutionWorkflow)
Parameters
executionId
string
Execution ID (also workflow ID)
event
Event to publish
Returns
Promise<void>
Implementation of
subscribeToEvents()
subscribeToEvents(
executionId,fromIndex,batchConfig?):AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:78
Subscribe to execution events
Parameters
executionId
string
Execution ID
fromIndex
number = 0
Starting event index (0-based)
batchConfig?
EventBatchConfig
Optional batching configuration
Returns
AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>
Async iterable of event batches
Implementation of
unsubscribe()
unsubscribe(
executionId):Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:112
Unsubscribe from events
Parameters
executionId
string
Returns
Promise<void>