Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-executor / server / DBOSEventBus

Class: DBOSEventBus

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:31

DBOS-based implementation of IEventBus

Simplified wrapper around StreamBridge for execution event streaming. Uses generic DBOS stream infrastructure with MultiChannel pattern.

Features:

  • Real-time streaming via PostgreSQL LISTEN/NOTIFY
  • Automatic sharding across PGListener pool
  • MultiChannel for efficient fan-out
  • Backward compatible with IEventBus interface

Implements

Constructors

Constructor

new DBOSEventBus(streamBridge): DBOSEventBus

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:35

Parameters

streamBridge

StreamBridge

Returns

DBOSEventBus

Methods

close()

close(): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:120

Close the event bus

Returns

Promise<void>

Implementation of

IEventBus.close


closeStream()

closeStream(executionId): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:130

Close stream for specific execution (no-op)

DBOS automatically closes streams when workflow terminates

Parameters

executionId

string

Returns

Promise<void>


publishEvent()

publishEvent(executionId, event): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:49

Publish an execution event to DBOS stream

Note: This continues to use DBOS.writeStream() directly to maintain existing execution flow (no changes to ExecutionWorkflow)

Parameters

executionId

string

Execution ID (also workflow ID)

event

ExecutionEventImpl

Event to publish

Returns

Promise<void>

Implementation of

IEventBus.publishEvent


subscribeToEvents()

subscribeToEvents(executionId, fromIndex, batchConfig?): AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:78

Subscribe to execution events

Parameters

executionId

string

Execution ID

fromIndex

number = 0

Starting event index (0-based)

batchConfig?

EventBatchConfig

Optional batching configuration

Returns

AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>

Async iterable of event batches

Implementation of

IEventBus.subscribeToEvents


unsubscribe()

unsubscribe(executionId): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSEventBus.ts:112

Unsubscribe from events

Parameters

executionId

string

Returns

Promise<void>

Implementation of

IEventBus.unsubscribe

Licensed under BUSL-1.1