Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-executor / server / ExecutionQueue

Class: ExecutionQueue

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:42

DBOS execution queue for managing chaingraph execution workflows

This class wraps DBOS WorkflowQueue to provide a typed interface for enqueuing and managing chaingraph executions. The queue is:

  • Durable: Backed by PostgreSQL
  • Distributed: Works across multiple workers
  • Concurrent: Supports global and per-worker concurrency limits
  • Reliable: Guarantees at-least-once execution

Key Features:

  • Global concurrency limit (across all workers)
  • Per-worker concurrency limit (per process)
  • FIFO ordering
  • Idempotency through execution IDs
  • Automatic retry on failure

Example Usage:

typescript
const queue = new ExecutionQueue({ concurrency: 100, workerConcurrency: 5 });
const handle = await queue.enqueue(task);
const result = await handle.getResult();

Constructors

Constructor

new ExecutionQueue(options?): ExecutionQueue

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:52

Create a new execution queue

Parameters

options?

DBOSQueueOptions

Queue configuration options

Returns

ExecutionQueue

Methods

enqueue()

enqueue(task): Promise<WorkflowHandle<ExecutionResult>>

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:78

Enqueue an execution task for processing

This method durably enqueues a chaingraph execution task. The execution is guaranteed to eventually run even if the system crashes.

Idempotency: Uses executionId as both the workflow ID and idempotency key, ensuring that duplicate enqueue attempts for the same execution are safely ignored.

Timeout: Executions have a 35-minute timeout to handle long-running workflows.

Parameters

task

ExecutionTask

Execution task containing executionId and flowId

Returns

Promise<WorkflowHandle<ExecutionResult>>

Workflow handle for tracking execution status and result


getHandle()

getHandle(executionId): Promise<WorkflowHandle<ExecutionResult> | null>

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:112

Retrieve workflow handle for an existing execution

This allows you to reconnect to a running or completed workflow and check its status or retrieve its result.

Parameters

executionId

string

The execution ID (also used as workflow ID)

Returns

Promise<WorkflowHandle<ExecutionResult> | null>

Workflow handle or null if not found


getQueueName()

getQueueName(): string

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:169

Get the queue name

Returns

string

The name of this queue


getResult()

getResult(executionId): Promise<ExecutionResult | null>

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:151

Get the result of a completed execution

This will wait for the execution to complete if it's still running.

Parameters

executionId

string

The execution ID

Returns

Promise<ExecutionResult | null>

Execution result or null if not found


getStatus()

getStatus(executionId): Promise<string | null>

Defined in: packages/chaingraph-executor/server/dbos/queues/ExecutionQueue.ts:134

Get the current status of an execution workflow

Possible statuses:

  • PENDING: Queued but not yet started
  • ENQUEUED: In queue waiting for worker capacity
  • RUNNING: Currently executing
  • SUCCESS: Completed successfully
  • ERROR: Failed with error

Parameters

executionId

string

The execution ID

Returns

Promise<string | null>

Status string or null if not found

Licensed under BUSL-1.1