Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-executor

@badaitech/chaingraph-executor

Distributed execution engine for ChainGraph flows with built-in fault tolerance, recovery, and horizontal scaling.

Overview

chaingraph-executor is a production-grade distributed execution system that runs ChainGraph flows reliably at scale. It uses DBOS (Durable Backend Orchestration System) for workflow orchestration and PostgreSQL for distributed coordination, providing at-least-once execution semantics with automatic failure recovery.

Key Features:

  • Durable Workflows: DBOS orchestration with automatic checkpointing and recovery
  • Distributed Execution: Horizontally scalable worker pool via PostgreSQL + DBOS
  • Automatic Recovery: Detects and recovers stuck/failed executions within 30-60 seconds
  • Fault Tolerant: Handles worker crashes, database failures, and network issues
  • Event Streaming: Real-time execution events via PostgreSQL LISTEN/NOTIFY
  • Comprehensive Metrics: Built-in observability and performance tracking

Installation

From GitHub Packages

bash
npm install @badaitech/chaingraph-executor

Authentication: Requires GitHub Packages credentials in .npmrc:

@badaitech:registry=https://npm.pkg.github.com
//npm.pkg.github.com/:_authToken=YOUR_GITHUB_TOKEN

Peer Dependencies

bash
npm install superjson

Quick Start

Basic Setup

typescript
import { getExecutionStore, getEventBus, getTaskQueue } from '@badaitech/chaingraph-executor/server'
import { ExecutionService } from '@badaitech/chaingraph-executor/server'

// Initialize components
const store = getExecutionStore()
const eventBus = getEventBus()
const taskQueue = getTaskQueue()

// Create execution service
const executionService = new ExecutionService(store, eventBus, taskQueue)

// Start execution
const execution = await executionService.createExecutionInstance({
  task: { executionId: 'EX123', flowId: 'flow-1' },
  flow: yourFlow,
  executionRow: executionRecord,
  abortController: new AbortController(),
})

// Listen for events
executionService.getEventBus().subscribe((event) => {
  console.log('Execution event:', event)
})

Running Workers

bash
# Development mode (in-memory execution)
npm run dev:worker

# Production mode (DBOS with PostgreSQL)
EXECUTION_MODE=dbos npm run start:worker

# With custom configuration
WORKER_ID=worker-1 \
WORKER_CONCURRENCY=10 \
DATABASE_URL=postgres://user:pass@localhost/chaingraph \
npm run start:worker

Architecture

The executor uses a DBOS-based distributed system:

┌─────────────┐
│  Clients    │ (tRPC API)
└──────┬──────┘
       │ Create execution

┌─────────────────────────────────────────┐
│  PostgreSQL (Source of Truth)            │
│  • chaingraph_executions                 │
│  • chaingraph_execution_claims           │
│  • chaingraph_execution_recovery         │
└──────┬──────────────────────────────────┘
       │ Workflow stored

┌─────────────────────────────────────────┐
│  DBOS Durable Queue                      │
│  • Orchestrates workflow execution       │
│  • Handles recovery & retries            │
└──────┬──────────────────────────────────┘
       │ Distribute tasks

┌─────────────────────────────────────────┐
│  Worker Pool (Horizontally Scalable)     │
│  • Execute flows                         │
│  • Emit events                           │
│  • Report status                         │
└─────────────────────────────────────────┘

Task Processing Flow

  1. Task Creation: Flow execution request stored in PostgreSQL
  2. Workflow Submission: DBOS workflow created for the execution
  3. Worker Claim: Worker claims execution and begins processing
  4. Execution: Flow nodes execute, ports updated, events emitted
  5. Completion: Status marked 'completed' or 'failed'
  6. Recovery: If worker crashes, DBOS automatically resumes from last checkpoint

PostgreSQL Setup

Database Initialization

bash
# Create database
createdb chaingraph

# Run migrations
npm run migrate:push

# Or, generate migrations from schema
npm run migrate:generate

Required Tables

The executor creates these tables automatically via migrations:

  • chaingraph_executions: Core execution records with status tracking
  • chaingraph_execution_claims: DBOS workflow claims for distributed coordination
  • chaingraph_execution_recovery: Audit trail of recovery actions

Connection String

bash
# Local development
DATABASE_URL=postgres://postgres:password@localhost:5432/chaingraph

# Separate execution database (optional)
DATABASE_URL_EXECUTIONS=postgres://user:pass@db.example.com/chaingraph_executions

DBOS Workflows

Execution Workflow Pattern

The executor uses DBOS workflows for reliable execution:

typescript
async function executeChainGraph(task: ExecutionTask): Promise<ExecutionResult> {
  // Step 1: Initialize execution (checkpointed)
  await DBOS.step(async () => {
    await executionStore.updateStatus(task.executionId, 'running')
  })

  // Step 2: Execute flow (checkpointed)
  const result = await DBOS.step(async () => {
    return await executeFlow(task)
  })

  // Step 3: Mark complete (checkpointed)
  await DBOS.step(async () => {
    await executionStore.updateStatus(task.executionId, 'completed', result)
  })

  return result
}

Recovery Guarantee: If a worker crashes at any step, DBOS automatically resumes from the last completed checkpoint on another worker.

Event Streaming

Real-time events via PostgreSQL LISTEN:

typescript
eventBus.subscribe((event) => {
  if (event.type === 'PORT_UPDATED') {
    console.log(`Port ${event.portId} updated to ${event.value}`)
  }
  if (event.type === 'EXECUTION_COMPLETED') {
    console.log(`Execution ${event.executionId} completed`)
  }
})

Configuration

Environment Variables

VariableDefaultPurpose
EXECUTION_MODElocallocal or dbos
DATABASE_URLpostgres://...PostgreSQL connection
DATABASE_URL_EXECUTIONSDATABASE_URLSeparate DB for executions (optional)
WORKER_IDAuto-generatedUnique worker identifier
WORKER_CONCURRENCY10Concurrent executions per worker
WORKER_CLAIM_TIMEOUT_MS30000Claim expiration timeout (30s)
WORKER_HEARTBEAT_INTERVAL_MS5000Heartbeat interval (5s)
LOG_LEVELinfodebug, info, warn, error
ENABLE_RECOVERYtrueEnable automatic recovery
RECOVERY_SCAN_INTERVAL_MS30000Recovery scan frequency (30s)
RECOVERY_MAX_FAILURE_COUNT5Max retries before permanent failure
MAX_EXECUTION_DEPTH100Max nested subflow levels
MAX_CHILDREN_PER_PARENT100Max child executions per parent
ENABLE_DBOS_EXECUTIONtrueEnable DBOS mode
DBOS_CONDUCTOR_URLundefinedRemote DBOS conductor URL
DBOS_SYSTEM_DATABASE_URLDATABASE_URLDBOS system tables database
DBOS_ADMIN_PORT3022DBOS admin server port
DBOS_QUEUE_CONCURRENCY100Global execution limit
DBOS_WORKER_CONCURRENCY5Per-worker execution limit

Example Configuration

bash
# Production setup
export EXECUTION_MODE=dbos
export DATABASE_URL=postgres://prod_user:pass@prod-db:5432/chaingraph
export WORKER_ID=worker-prod-1
export WORKER_CONCURRENCY=20
export LOG_LEVEL=info
export ENABLE_RECOVERY=true
export DBOS_QUEUE_CONCURRENCY=500
export DBOS_WORKER_CONCURRENCY=10
export MAX_EXECUTION_DEPTH=50

Running Workers

Single Worker (Development)

bash
# In-memory mode
npm run dev:worker

# DBOS mode
EXECUTION_MODE=dbos npm run dev:worker

Worker Cluster (Production)

bash
# Terminal 1: Worker 1
WORKER_ID=worker-1 npm run start:worker

# Terminal 2: Worker 2
WORKER_ID=worker-2 npm run start:worker

# Terminal 3: Worker N
WORKER_ID=worker-n npm run start:worker

Docker Deployment

dockerfile
FROM node:22-alpine
WORKDIR /app

COPY . .
RUN npm ci

ENV EXECUTION_MODE=dbos
ENV WORKER_CONCURRENCY=10

CMD ["npm", "run", "start:worker"]
bash
# Run 5 worker replicas
docker run -d --env WORKER_ID=worker-1 --env DATABASE_URL=$DB_URL my-executor:latest
docker run -d --env WORKER_ID=worker-2 --env DATABASE_URL=$DB_URL my-executor:latest
# ... repeat for workers 3-5

Development vs Production

AspectDevelopmentProduction
ModeEXECUTION_MODE=localEXECUTION_MODE=dbos
StorageIn-memoryPostgreSQL + DBOS
RecoveryNoneAutomatic (30s interval)
Concurrency1-10100+ (horizontally scaled)
DurabilityNoneCheckpointed workflows
Use CaseTesting, debugging24/7 reliable execution

Switching to Production

  1. Set EXECUTION_MODE=dbos
  2. Configure PostgreSQL connection
  3. Run npm run migrate:push
  4. Deploy multiple worker instances
  5. Monitor with metrics (ENABLE_METRICS=true)

Troubleshooting

Worker Not Claiming Tasks

Symptoms: Tasks stuck in 'created' status, workers idle

Debug:

bash
# Check claim status
SELECT execution_id, worker_id, status, expires_at 
FROM chaingraph_execution_claims 
WHERE status != 'released';

# Check worker heartbeat
SELECT * FROM chaingraph_executions 
WHERE processing_worker_id IS NOT NULL 
AND processing_started_at < NOW() - INTERVAL '1 minute';

Solutions:

  • Verify DATABASE_URL is set correctly
  • Check worker logs: LOG_LEVEL=debug npm run start:worker
  • Ensure PostgreSQL is reachable from worker

Executions Never Complete

Symptoms: Status stays 'running', events not emitted

Debug:

bash
# Check DBOS workflow status
SELECT * FROM dbos_workflows WHERE user_id = 'executionId';

# Monitor event queue
SELECT COUNT(*) FROM dbos_events;

Solutions:

  • Verify ENABLE_DBOS_EXECUTION=true in production
  • Check PostgreSQL write permissions
  • Monitor DBOS admin server: http://localhost:3022

Recovery Not Triggering

Symptoms: Stuck executions not being recovered

Debug:

bash
# Check recovery service status
SELECT * FROM chaingraph_execution_recovery 
ORDER BY recovered_at DESC LIMIT 10;

# Check for expired claims
SELECT * FROM chaingraph_execution_claims 
WHERE expires_at < NOW();

Solutions:

  • Verify ENABLE_RECOVERY=true (default)
  • Increase RECOVERY_SCAN_INTERVAL_MS if too frequent
  • Check at least one worker is running
  • Monitor: tail -f logs/executor.log | grep recovery

High Memory Usage

Symptoms: Worker process consuming excessive memory

Solutions:

  • Reduce WORKER_CONCURRENCY: WORKER_CONCURRENCY=5
  • Reduce DBOS_QUEUE_CONCURRENCY: DBOS_QUEUE_CONCURRENCY=50
  • Enable memory metrics: ENABLE_METRICS=true METRICS_INCLUDE_MEMORY=true
  • Profile with: node --prof dist/server/workers/index.js

Database Connection Errors

Symptoms: ECONNREFUSED, ENOTFOUND errors

Solutions:

bash
# Test connection
psql $DATABASE_URL -c "SELECT 1"

# Check DNS (if using hostname)
nslookup db.example.com

# Verify credentials
grep DATABASE_URL .env

# Check pool settings (optional)
CONNECTION_POOL_MIN=5 CONNECTION_POOL_MAX=20 npm run start:worker

Slow Event Processing

Symptoms: Events delayed, UI not updating in real-time

Solutions:

  • Check PostgreSQL CPU: SELECT * FROM pg_stat_statements WHERE query LIKE '%LISTEN%'
  • Reduce batch size: METRICS_BATCH_SIZE=1
  • Scale up workers
  • Monitor: ENABLE_METRICS=true LOG_LEVEL=debug

API Reference

ExecutionService

typescript
// Create and execute a flow
const instance = await executionService.createExecutionInstance({
  task: ExecutionTask,
  flow: Flow,
  executionRow: ExecutionRow,
  abortController: AbortController,
})

// Subscribe to events
executionService.getEventBus().subscribe((event: FlowEvent) => {
  // Handle event
})

// Get execution status
const status = await store.getExecution(executionId)

Event Bus

typescript
// Subscribe to all events
eventBus.subscribe(handler)

// Subscribe to specific type
eventBus.on('EXECUTION_COMPLETED', handler)

// Publish event
await eventBus.emit(event)

Execution Store

typescript
// Query executions
const execution = await store.getExecution(executionId)
const executions = await store.listExecutions({ status: 'running' })

// Update execution
await store.updateStatus(executionId, 'completed', result)

// Recover stuck execution
await store.markForRecovery(executionId)

Performance Tuning

For 1000+ Executions/Second

bash
# Increase concurrency
DBOS_QUEUE_CONCURRENCY=500
DBOS_WORKER_CONCURRENCY=20
WORKER_CONCURRENCY=20

# Deploy more workers
WORKER_ID=worker-1,2,3...10

# Use PostgreSQL connection pooling
CONNECTION_POOL_MAX=50

# Reduce recovery scan frequency
RECOVERY_SCAN_INTERVAL_MS=60000

For Real-Time Requirements

bash
# Reduce timeouts
WORKER_CLAIM_TIMEOUT_MS=15000
WORKER_HEARTBEAT_INTERVAL_MS=2000

# Increase event flush rate
METRICS_FLUSH_INTERVAL=100

# Enable debug logging
LOG_LEVEL=debug

License

Licensed under Business Source License 1.1 (BUSL-1.1). See LICENSE file for details.


Documentation: See the Architecture Overview for technical details.

Issues: Report bugs on GitHub

Modules

Licensed under BUSL-1.1