ChainGraph API Documentation / @badaitech/chaingraph-executor
@badaitech/chaingraph-executor
Distributed execution engine for ChainGraph flows with built-in fault tolerance, recovery, and horizontal scaling.
Overview
chaingraph-executor is a production-grade distributed execution system that runs ChainGraph flows reliably at scale. It uses DBOS (Durable Backend Orchestration System) for workflow orchestration and PostgreSQL for distributed coordination, providing at-least-once execution semantics with automatic failure recovery.
Key Features:
- Durable Workflows: DBOS orchestration with automatic checkpointing and recovery
- Distributed Execution: Horizontally scalable worker pool via PostgreSQL + DBOS
- Automatic Recovery: Detects and recovers stuck/failed executions within 30-60 seconds
- Fault Tolerant: Handles worker crashes, database failures, and network issues
- Event Streaming: Real-time execution events via PostgreSQL LISTEN/NOTIFY
- Comprehensive Metrics: Built-in observability and performance tracking
Installation
From GitHub Packages
npm install @badaitech/chaingraph-executorAuthentication: Requires GitHub Packages credentials in .npmrc:
@badaitech:registry=https://npm.pkg.github.com
//npm.pkg.github.com/:_authToken=YOUR_GITHUB_TOKENPeer Dependencies
npm install superjsonQuick Start
Basic Setup
import { getExecutionStore, getEventBus, getTaskQueue } from '@badaitech/chaingraph-executor/server'
import { ExecutionService } from '@badaitech/chaingraph-executor/server'
// Initialize components
const store = getExecutionStore()
const eventBus = getEventBus()
const taskQueue = getTaskQueue()
// Create execution service
const executionService = new ExecutionService(store, eventBus, taskQueue)
// Start execution
const execution = await executionService.createExecutionInstance({
task: { executionId: 'EX123', flowId: 'flow-1' },
flow: yourFlow,
executionRow: executionRecord,
abortController: new AbortController(),
})
// Listen for events
executionService.getEventBus().subscribe((event) => {
console.log('Execution event:', event)
})Running Workers
# Development mode (in-memory execution)
npm run dev:worker
# Production mode (DBOS with PostgreSQL)
EXECUTION_MODE=dbos npm run start:worker
# With custom configuration
WORKER_ID=worker-1 \
WORKER_CONCURRENCY=10 \
DATABASE_URL=postgres://user:pass@localhost/chaingraph \
npm run start:workerArchitecture
The executor uses a DBOS-based distributed system:
┌─────────────┐
│ Clients │ (tRPC API)
└──────┬──────┘
│ Create execution
▼
┌─────────────────────────────────────────┐
│ PostgreSQL (Source of Truth) │
│ • chaingraph_executions │
│ • chaingraph_execution_claims │
│ • chaingraph_execution_recovery │
└──────┬──────────────────────────────────┘
│ Workflow stored
▼
┌─────────────────────────────────────────┐
│ DBOS Durable Queue │
│ • Orchestrates workflow execution │
│ • Handles recovery & retries │
└──────┬──────────────────────────────────┘
│ Distribute tasks
▼
┌─────────────────────────────────────────┐
│ Worker Pool (Horizontally Scalable) │
│ • Execute flows │
│ • Emit events │
│ • Report status │
└─────────────────────────────────────────┘Task Processing Flow
- Task Creation: Flow execution request stored in PostgreSQL
- Workflow Submission: DBOS workflow created for the execution
- Worker Claim: Worker claims execution and begins processing
- Execution: Flow nodes execute, ports updated, events emitted
- Completion: Status marked 'completed' or 'failed'
- Recovery: If worker crashes, DBOS automatically resumes from last checkpoint
PostgreSQL Setup
Database Initialization
# Create database
createdb chaingraph
# Run migrations
npm run migrate:push
# Or, generate migrations from schema
npm run migrate:generateRequired Tables
The executor creates these tables automatically via migrations:
chaingraph_executions: Core execution records with status trackingchaingraph_execution_claims: DBOS workflow claims for distributed coordinationchaingraph_execution_recovery: Audit trail of recovery actions
Connection String
# Local development
DATABASE_URL=postgres://postgres:password@localhost:5432/chaingraph
# Separate execution database (optional)
DATABASE_URL_EXECUTIONS=postgres://user:pass@db.example.com/chaingraph_executionsDBOS Workflows
Execution Workflow Pattern
The executor uses DBOS workflows for reliable execution:
async function executeChainGraph(task: ExecutionTask): Promise<ExecutionResult> {
// Step 1: Initialize execution (checkpointed)
await DBOS.step(async () => {
await executionStore.updateStatus(task.executionId, 'running')
})
// Step 2: Execute flow (checkpointed)
const result = await DBOS.step(async () => {
return await executeFlow(task)
})
// Step 3: Mark complete (checkpointed)
await DBOS.step(async () => {
await executionStore.updateStatus(task.executionId, 'completed', result)
})
return result
}Recovery Guarantee: If a worker crashes at any step, DBOS automatically resumes from the last completed checkpoint on another worker.
Event Streaming
Real-time events via PostgreSQL LISTEN:
eventBus.subscribe((event) => {
if (event.type === 'PORT_UPDATED') {
console.log(`Port ${event.portId} updated to ${event.value}`)
}
if (event.type === 'EXECUTION_COMPLETED') {
console.log(`Execution ${event.executionId} completed`)
}
})Configuration
Environment Variables
| Variable | Default | Purpose |
|---|---|---|
EXECUTION_MODE | local | local or dbos |
DATABASE_URL | postgres://... | PostgreSQL connection |
DATABASE_URL_EXECUTIONS | DATABASE_URL | Separate DB for executions (optional) |
WORKER_ID | Auto-generated | Unique worker identifier |
WORKER_CONCURRENCY | 10 | Concurrent executions per worker |
WORKER_CLAIM_TIMEOUT_MS | 30000 | Claim expiration timeout (30s) |
WORKER_HEARTBEAT_INTERVAL_MS | 5000 | Heartbeat interval (5s) |
LOG_LEVEL | info | debug, info, warn, error |
ENABLE_RECOVERY | true | Enable automatic recovery |
RECOVERY_SCAN_INTERVAL_MS | 30000 | Recovery scan frequency (30s) |
RECOVERY_MAX_FAILURE_COUNT | 5 | Max retries before permanent failure |
MAX_EXECUTION_DEPTH | 100 | Max nested subflow levels |
MAX_CHILDREN_PER_PARENT | 100 | Max child executions per parent |
ENABLE_DBOS_EXECUTION | true | Enable DBOS mode |
DBOS_CONDUCTOR_URL | undefined | Remote DBOS conductor URL |
DBOS_SYSTEM_DATABASE_URL | DATABASE_URL | DBOS system tables database |
DBOS_ADMIN_PORT | 3022 | DBOS admin server port |
DBOS_QUEUE_CONCURRENCY | 100 | Global execution limit |
DBOS_WORKER_CONCURRENCY | 5 | Per-worker execution limit |
Example Configuration
# Production setup
export EXECUTION_MODE=dbos
export DATABASE_URL=postgres://prod_user:pass@prod-db:5432/chaingraph
export WORKER_ID=worker-prod-1
export WORKER_CONCURRENCY=20
export LOG_LEVEL=info
export ENABLE_RECOVERY=true
export DBOS_QUEUE_CONCURRENCY=500
export DBOS_WORKER_CONCURRENCY=10
export MAX_EXECUTION_DEPTH=50Running Workers
Single Worker (Development)
# In-memory mode
npm run dev:worker
# DBOS mode
EXECUTION_MODE=dbos npm run dev:workerWorker Cluster (Production)
# Terminal 1: Worker 1
WORKER_ID=worker-1 npm run start:worker
# Terminal 2: Worker 2
WORKER_ID=worker-2 npm run start:worker
# Terminal 3: Worker N
WORKER_ID=worker-n npm run start:workerDocker Deployment
FROM node:22-alpine
WORKDIR /app
COPY . .
RUN npm ci
ENV EXECUTION_MODE=dbos
ENV WORKER_CONCURRENCY=10
CMD ["npm", "run", "start:worker"]# Run 5 worker replicas
docker run -d --env WORKER_ID=worker-1 --env DATABASE_URL=$DB_URL my-executor:latest
docker run -d --env WORKER_ID=worker-2 --env DATABASE_URL=$DB_URL my-executor:latest
# ... repeat for workers 3-5Development vs Production
| Aspect | Development | Production |
|---|---|---|
| Mode | EXECUTION_MODE=local | EXECUTION_MODE=dbos |
| Storage | In-memory | PostgreSQL + DBOS |
| Recovery | None | Automatic (30s interval) |
| Concurrency | 1-10 | 100+ (horizontally scaled) |
| Durability | None | Checkpointed workflows |
| Use Case | Testing, debugging | 24/7 reliable execution |
Switching to Production
- Set
EXECUTION_MODE=dbos - Configure PostgreSQL connection
- Run
npm run migrate:push - Deploy multiple worker instances
- Monitor with metrics (
ENABLE_METRICS=true)
Troubleshooting
Worker Not Claiming Tasks
Symptoms: Tasks stuck in 'created' status, workers idle
Debug:
# Check claim status
SELECT execution_id, worker_id, status, expires_at
FROM chaingraph_execution_claims
WHERE status != 'released';
# Check worker heartbeat
SELECT * FROM chaingraph_executions
WHERE processing_worker_id IS NOT NULL
AND processing_started_at < NOW() - INTERVAL '1 minute';Solutions:
- Verify
DATABASE_URLis set correctly - Check worker logs:
LOG_LEVEL=debug npm run start:worker - Ensure PostgreSQL is reachable from worker
Executions Never Complete
Symptoms: Status stays 'running', events not emitted
Debug:
# Check DBOS workflow status
SELECT * FROM dbos_workflows WHERE user_id = 'executionId';
# Monitor event queue
SELECT COUNT(*) FROM dbos_events;Solutions:
- Verify
ENABLE_DBOS_EXECUTION=truein production - Check PostgreSQL write permissions
- Monitor DBOS admin server:
http://localhost:3022
Recovery Not Triggering
Symptoms: Stuck executions not being recovered
Debug:
# Check recovery service status
SELECT * FROM chaingraph_execution_recovery
ORDER BY recovered_at DESC LIMIT 10;
# Check for expired claims
SELECT * FROM chaingraph_execution_claims
WHERE expires_at < NOW();Solutions:
- Verify
ENABLE_RECOVERY=true(default) - Increase
RECOVERY_SCAN_INTERVAL_MSif too frequent - Check at least one worker is running
- Monitor:
tail -f logs/executor.log | grep recovery
High Memory Usage
Symptoms: Worker process consuming excessive memory
Solutions:
- Reduce
WORKER_CONCURRENCY:WORKER_CONCURRENCY=5 - Reduce
DBOS_QUEUE_CONCURRENCY:DBOS_QUEUE_CONCURRENCY=50 - Enable memory metrics:
ENABLE_METRICS=true METRICS_INCLUDE_MEMORY=true - Profile with:
node --prof dist/server/workers/index.js
Database Connection Errors
Symptoms: ECONNREFUSED, ENOTFOUND errors
Solutions:
# Test connection
psql $DATABASE_URL -c "SELECT 1"
# Check DNS (if using hostname)
nslookup db.example.com
# Verify credentials
grep DATABASE_URL .env
# Check pool settings (optional)
CONNECTION_POOL_MIN=5 CONNECTION_POOL_MAX=20 npm run start:workerSlow Event Processing
Symptoms: Events delayed, UI not updating in real-time
Solutions:
- Check PostgreSQL CPU:
SELECT * FROM pg_stat_statements WHERE query LIKE '%LISTEN%' - Reduce batch size:
METRICS_BATCH_SIZE=1 - Scale up workers
- Monitor:
ENABLE_METRICS=true LOG_LEVEL=debug
API Reference
ExecutionService
// Create and execute a flow
const instance = await executionService.createExecutionInstance({
task: ExecutionTask,
flow: Flow,
executionRow: ExecutionRow,
abortController: AbortController,
})
// Subscribe to events
executionService.getEventBus().subscribe((event: FlowEvent) => {
// Handle event
})
// Get execution status
const status = await store.getExecution(executionId)Event Bus
// Subscribe to all events
eventBus.subscribe(handler)
// Subscribe to specific type
eventBus.on('EXECUTION_COMPLETED', handler)
// Publish event
await eventBus.emit(event)Execution Store
// Query executions
const execution = await store.getExecution(executionId)
const executions = await store.listExecutions({ status: 'running' })
// Update execution
await store.updateStatus(executionId, 'completed', result)
// Recover stuck execution
await store.markForRecovery(executionId)Related Packages
- @badaitech/chaingraph-types: Core type system and node definitions
- @badaitech/chaingraph-nodes: Pre-built node library
- @badaitech/chaingraph-trpc: Type-safe API layer
- DBOS Documentation: Durable orchestration framework
Performance Tuning
For 1000+ Executions/Second
# Increase concurrency
DBOS_QUEUE_CONCURRENCY=500
DBOS_WORKER_CONCURRENCY=20
WORKER_CONCURRENCY=20
# Deploy more workers
WORKER_ID=worker-1,2,3...10
# Use PostgreSQL connection pooling
CONNECTION_POOL_MAX=50
# Reduce recovery scan frequency
RECOVERY_SCAN_INTERVAL_MS=60000For Real-Time Requirements
# Reduce timeouts
WORKER_CLAIM_TIMEOUT_MS=15000
WORKER_HEARTBEAT_INTERVAL_MS=2000
# Increase event flush rate
METRICS_FLUSH_INTERVAL=100
# Enable debug logging
LOG_LEVEL=debugLicense
Licensed under Business Source License 1.1 (BUSL-1.1). See LICENSE file for details.
Documentation: See the Architecture Overview for technical details.
Issues: Report bugs on GitHub