Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-executor / server / DBOSTaskQueue

Class: DBOSTaskQueue

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:40

DBOS-based implementation of ITaskQueue interface

This implementation uses DBOS Durable Queues instead of Kafka for task distribution. It provides a compatible interface with the existing ITaskQueue so it can be used as a drop-in replacement in the system.

Key Differences from Kafka Implementation:

  • No manual offset management (DBOS handles it)
  • No consumer groups (DBOS handles distribution)
  • consumeTasks() is a no-op (DBOS auto-consumes via workflow registration)
  • Built-in exactly-once semantics through idempotency

Usage:

typescript
const queue = worker.getQueue();
const taskQueue = new DBOSTaskQueue(queue);

// Publish task
await taskQueue.publishTask(task);

// Consumption is automatic - no need to call consumeTasks()

Implements

Constructors

Constructor

new DBOSTaskQueue(executionQueue): DBOSTaskQueue

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:46

Create a DBOS task queue

Parameters

executionQueue

ExecutionQueue

The DBOS execution queue instance

Returns

DBOSTaskQueue

Methods

close()

close(): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:122

Close the queue and cleanup resources

NOTE: This is handled by DBOSExecutionWorker.stop(). No cleanup needed at the queue level.

Returns

Promise<void>

Implementation of

ITaskQueue.close


consumeTasks()

consumeTasks(_handler): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:86

Consume tasks from the queue

NOTE: This is a no-op for DBOS implementation because DBOS automatically consumes from the queue through workflow registration. Workers don't need to manually subscribe - DBOS handles consumption internally.

The handler parameter is ignored because DBOS workflows are registered separately (see DBOSExecutionWorker.start()).

Parameters

_handler

TaskHandler

Task handler (unused in DBOS implementation)

Returns

Promise<void>

Implementation of

ITaskQueue.consumeTasks


getPendingCount()

getPendingCount(): Promise<number>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:111

Get the number of pending tasks in the queue

NOTE: This is optional in the ITaskQueue interface and not currently implemented for DBOS. DBOS workflows are tracked in system tables, but there's no simple API to count pending tasks.

Returns

Promise<number>

Undefined (not implemented)

Implementation of

ITaskQueue.getPendingCount


publishTask()

publishTask(task): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:58

Publish an execution task to the DBOS queue

The task will be durably stored in PostgreSQL and eventually processed by a worker. DBOS guarantees at-least-once execution.

Parameters

task

ExecutionTask

Execution task to publish

Returns

Promise<void>

Implementation of

ITaskQueue.publishTask


stopConsuming()

stopConsuming(): Promise<void>

Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:97

Stop consuming tasks

NOTE: This is handled by DBOSExecutionWorker.stop() which shuts down the entire DBOS runtime. Individual task queues don't need to stop.

Returns

Promise<void>

Implementation of

ITaskQueue.stopConsuming

Licensed under BUSL-1.1