ChainGraph API Documentation / @badaitech/chaingraph-executor / server / DBOSExecutionWorker
Class: DBOSExecutionWorker
Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:64
DBOS-based execution worker for chaingraph flows
This worker replaces the Kafka-based ExecutionWorker with a DBOS-powered implementation that provides:
- Durable execution with automatic recovery
- Exactly-once semantics through idempotency
- Built-in concurrency control
- PostgreSQL-backed queue (no Kafka needed for task distribution)
Architecture:
tRPC API → queue.enqueue(task)
↓
PostgreSQL (DBOS queue)
↓
DBOS Worker (auto-consumes)
↓
ExecutionWorkflow:
1. Update status to "running"
2. Execute flow atomically
3. Update status to "completed"Key Differences from Kafka Worker:
- No manual claim management (DBOS handles it)
- No manual recovery service (DBOS handles it)
- No manual offset commits (DBOS handles it)
- Simpler code (~500 lines removed)
Usage:
const worker = new DBOSExecutionWorker(store, eventBus, taskQueue, {
concurrency: 100,
workerConcurrency: 5
});
await worker.start();
// Worker automatically processes queued executionsConstructors
Constructor
new DBOSExecutionWorker(
store,executionService,options?):DBOSExecutionWorker
Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:76
Create a new DBOS execution worker
Parameters
store
Execution store for database operations
executionService
Execution service for flow execution (optional, can be set via initializeExecuteFlowStep)
ExecutionService | null
options?
Worker configuration options
Returns
DBOSExecutionWorker
Methods
getQueue()
getQueue():
ExecutionQueue
Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:172
Get the execution queue
This allows external code (e.g., tRPC API) to enqueue executions
Returns
The execution queue instance
isWorkerRunning()
isWorkerRunning():
boolean
Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:180
Check if worker is running
Returns
boolean
True if worker is running
start()
start():
Promise<void>
Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:93
Start the DBOS execution worker
This initializes DBOS, registers workflows and steps, and starts processing queued executions automatically.
Returns
Promise<void>
stop()
stop():
Promise<void>
Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:147
Stop the DBOS execution worker gracefully
This shuts down DBOS, which will:
- Stop accepting new workflow executions
- Wait for in-progress workflows to complete (graceful)
- Close database connections
- Clean up resources
Returns
Promise<void>