ChainGraph API Documentation / @badaitech/chaingraph-executor / server / DBOSTaskQueue
Class: DBOSTaskQueue
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:40
DBOS-based implementation of ITaskQueue interface
This implementation uses DBOS Durable Queues instead of Kafka for task distribution. It provides a compatible interface with the existing ITaskQueue so it can be used as a drop-in replacement in the system.
Key Differences from Kafka Implementation:
- No manual offset management (DBOS handles it)
- No consumer groups (DBOS handles distribution)
- consumeTasks() is a no-op (DBOS auto-consumes via workflow registration)
- Built-in exactly-once semantics through idempotency
Usage:
const queue = worker.getQueue();
const taskQueue = new DBOSTaskQueue(queue);
// Publish task
await taskQueue.publishTask(task);
// Consumption is automatic - no need to call consumeTasks()Implements
Constructors
Constructor
new DBOSTaskQueue(
executionQueue):DBOSTaskQueue
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:46
Create a DBOS task queue
Parameters
executionQueue
The DBOS execution queue instance
Returns
DBOSTaskQueue
Methods
close()
close():
Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:122
Close the queue and cleanup resources
NOTE: This is handled by DBOSExecutionWorker.stop(). No cleanup needed at the queue level.
Returns
Promise<void>
Implementation of
consumeTasks()
consumeTasks(
_handler):Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:86
Consume tasks from the queue
NOTE: This is a no-op for DBOS implementation because DBOS automatically consumes from the queue through workflow registration. Workers don't need to manually subscribe - DBOS handles consumption internally.
The handler parameter is ignored because DBOS workflows are registered separately (see DBOSExecutionWorker.start()).
Parameters
_handler
TaskHandler
Task handler (unused in DBOS implementation)
Returns
Promise<void>
Implementation of
getPendingCount()
getPendingCount():
Promise<number>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:111
Get the number of pending tasks in the queue
NOTE: This is optional in the ITaskQueue interface and not currently implemented for DBOS. DBOS workflows are tracked in system tables, but there's no simple API to count pending tasks.
Returns
Promise<number>
Undefined (not implemented)
Implementation of
publishTask()
publishTask(
task):Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:58
Publish an execution task to the DBOS queue
The task will be durably stored in PostgreSQL and eventually processed by a worker. DBOS guarantees at-least-once execution.
Parameters
task
Execution task to publish
Returns
Promise<void>
Implementation of
stopConsuming()
stopConsuming():
Promise<void>
Defined in: packages/chaingraph-executor/server/implementations/dbos/DBOSTaskQueue.ts:97
Stop consuming tasks
NOTE: This is handled by DBOSExecutionWorker.stop() which shuts down the entire DBOS runtime. Individual task queues don't need to stop.
Returns
Promise<void>