Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-executor / server / DBOSExecutionWorker

Class: DBOSExecutionWorker

Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:64

DBOS-based execution worker for chaingraph flows

This worker replaces the Kafka-based ExecutionWorker with a DBOS-powered implementation that provides:

  • Durable execution with automatic recovery
  • Exactly-once semantics through idempotency
  • Built-in concurrency control
  • PostgreSQL-backed queue (no Kafka needed for task distribution)

Architecture:

tRPC API → queue.enqueue(task)

         PostgreSQL (DBOS queue)

         DBOS Worker (auto-consumes)

         ExecutionWorkflow:
           1. Update status to "running"
           2. Execute flow atomically
           3. Update status to "completed"

Key Differences from Kafka Worker:

  • No manual claim management (DBOS handles it)
  • No manual recovery service (DBOS handles it)
  • No manual offset commits (DBOS handles it)
  • Simpler code (~500 lines removed)

Usage:

typescript
const worker = new DBOSExecutionWorker(store, eventBus, taskQueue, {
  concurrency: 100,
  workerConcurrency: 5
});

await worker.start();
// Worker automatically processes queued executions

Constructors

Constructor

new DBOSExecutionWorker(store, executionService, options?): DBOSExecutionWorker

Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:76

Create a new DBOS execution worker

Parameters

store

IExecutionStore

Execution store for database operations

executionService

Execution service for flow execution (optional, can be set via initializeExecuteFlowStep)

ExecutionService | null

options?

DBOSWorkerOptions

Worker configuration options

Returns

DBOSExecutionWorker

Methods

getQueue()

getQueue(): ExecutionQueue

Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:172

Get the execution queue

This allows external code (e.g., tRPC API) to enqueue executions

Returns

ExecutionQueue

The execution queue instance


isWorkerRunning()

isWorkerRunning(): boolean

Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:180

Check if worker is running

Returns

boolean

True if worker is running


start()

start(): Promise<void>

Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:93

Start the DBOS execution worker

This initializes DBOS, registers workflows and steps, and starts processing queued executions automatically.

Returns

Promise<void>


stop()

stop(): Promise<void>

Defined in: packages/chaingraph-executor/server/dbos/DBOSExecutionWorker.ts:147

Stop the DBOS execution worker gracefully

This shuts down DBOS, which will:

  • Stop accepting new workflow executions
  • Wait for in-progress workflows to complete (graceful)
  • Close database connections
  • Clean up resources

Returns

Promise<void>

Licensed under BUSL-1.1