Skip to content

Executor Client

The ChainGraph Executor Client (@badaitech/chaingraph-executor/client) provides a type-safe tRPC client for executing flows on the ChainGraph execution engine. It enables real-time communication with the execution API via WebSockets, allowing you to trigger flow executions, monitor their progress, and control their lifecycle from your application.

Related Documentation

Overview

The executor client acts as a bridge between your application and the distributed ChainGraph execution engine. It handles:

  • Flow Execution Orchestration: Create, start, pause, resume, and stop flow executions
  • Real-Time Event Streaming: Subscribe to execution events via WebSocket for live updates
  • Execution Monitoring: Query execution status, history, and hierarchical execution trees
  • Integration Context: Pass runtime context (chat sessions, wallet connections, custom data) to flows
  • Type Safety: Full TypeScript support with end-to-end type inference

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                      Your Application                           │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │          ChainGraph Executor Client                      │   │
│  │  (createTRPCClient)                                      │   │
│  └──────────┬───────────────────────────────────────────────┘   │
└─────────────┼───────────────────────────────────────────────────┘
              │ WebSocket (wss://)
              │ ├─ Mutations (create, start, stop, pause, resume)
              │ ├─ Queries (getExecutionDetails, getExecutionsTree)
              │ └─ Subscriptions (real-time events)


┌─────────────────────────────────────────────────────────────────┐
│              ChainGraph Execution Engine                        │
│   wss://execution-api.chaingraph.io                             │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  tRPC Router                                             │   │
│  │  ├─ Authentication (sessionBadAI)                        │   │
│  │  ├─ Execution Store (PostgreSQL)                         │   │
│  │  └─ Event Bus (DBOS)                                     │   │
│  └──────────┬───────────────────────────────────────────────┘   │
└─────────────┼───────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                 DBOS Workflow Engine                            │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  Execution Workers (Horizontally Scalable)               │   │
│  │  ├─ Flow Execution (nodes, edges, transfers)             │   │
│  │  ├─ Event Emission (real-time streaming)                 │   │
│  │  └─ Child Execution Spawning                             │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Key Concepts

Two-Phase Execution Model: Executions are created first (initializing resources and event streams), then explicitly started. This ensures your event subscription is ready before execution begins.

Event-Driven Architecture: All execution progress is communicated via events streamed over WebSocket. Your client subscribes to events and reacts to flow progress in real-time.

Hierarchical Executions: Flows can spawn child executions via event emitters, creating execution trees. The client can query these trees to understand the full execution hierarchy.

At-Least-Once Delivery: The execution engine guarantees all events are delivered at least once. Failed executions are automatically retried by the recovery service.

Installation

bash
npm install @badaitech/chaingraph-executor superjson

Why SuperJSON? ChainGraph uses SuperJSON for complex type serialization (Dates, Maps, Sets, custom classes). This ensures data fidelity between client and server.

Execution Lifecycle

Understanding the execution lifecycle is crucial for working with the executor client. Every execution goes through well-defined phases, from creation to completion.

Lifecycle State Machine

                    ┌──────────┐
                    │   idle   │ (Not yet created)
                    └─────┬────┘

                   create.mutate()


                    ┌──────────┐
              ┌────▶│ created  │◀────┐ (Initialized, waiting for START_SIGNAL)
              │     └─────┬────┘     │
              │           │          │
              │    start.mutate()    │
              │           │          │
              │           ▼          │
              │     ┌──────────┐     │
              │     │ running  │─────┤ (Actively executing nodes)
              │     └─────┬────┘     │
              │           │          │
              │     pause.mutate()   │
              │           │          │
              │           ▼          │
    resume.   │     ┌──────────┐     │
    mutate()  │     │  paused  │─────┘
              │     └──────────┘


         stop.mutate()

    ┌─────────┴──────────┬───────────────┬──────────────┐
    │                    │               │              │
    ▼                    ▼               ▼              ▼
┌──────────┐      ┌──────────┐   ┌──────────┐   ┌──────────┐
│ stopped  │      │completed │   │  failed  │   │ timeout  │
└──────────┘      └──────────┘   └──────────┘   └──────────┘
(User cancel)     (Success)      (Error)        (Exceeded limit)

Execution Flow Sequence

Here's what happens when you execute a flow from the client perspective:

Client                          Execution API                    DBOS Workflow
  │                                  │                                 │
  │─────create.mutate()─────────────▶│                                 │
  │                                  │                                 │
  │                                  ├──[1] Create execution row       │
  │                                  │    (status: created)            │
  │                                  │                                 │
  │                                  ├──[2] Initialize event stream    │
  │                                  │                                 │
  │                                  ├─────start workflow─────────────▶│
  │                                  │                                 │
  │◀─────{executionId}───────────────┤                                 │
  │                                  │                                 ├─[3] Write EXECUTION_CREATED event
  │                                  │                                 │
  │                                  │                                 ├─[4] Wait for START_SIGNAL...
  │                                  │                                 │
  │──subscribe.subscribeToEvents────▶│                                 │
  │                                  │                                 │
  │                                  ├──connect to event stream───────▶│
  │                                  │                                 │
  │◀─────[event stream active]───────┤◀────[ready to emit]─────────────┤
  │                                  │                                 │
  │─────start.mutate()──────────────▶│                                 │
  │                                  │                                 │
  │                                  ├─────DBOS.send('START_SIGNAL')──▶│
  │                                  │                                 │
  │◀─────{success: true}─────────────┤                                 │
  │                                  │                                 ├─[5] Resume from signal
  │                                  │                                 │
  │                                  │                                 ├─[6] Execute flow nodes
  │                                  │                                 │
  │◀─────FLOW_STARTED event──────────┤◀────[emit]──────────────────────┤
  │◀─────NODE_STARTED events─────────┤◀────[emit]──────────────────────┤
  │◀─────NODE_COMPLETED events───────┤◀────[emit]──────────────────────┤
  │◀─────EDGE_TRANSFER events────────┤◀────[emit]──────────────────────┤
  │◀─────FLOW_COMPLETED event────────┤◀────[emit]──────────────────────┤
  │                                  │                                 │
  │──getExecutionDetails.query()────▶│                                 │
  │◀─────{status: completed}─────────┤                                 │
  │                                  │                                 │
  │──unsubscribe()──────────────────▶│                                 │
  │                                  ├──disconnect event stream───────▶│
  │                                  │                                 │

Key Insights:

  1. Phase 1 (Create): The execution is created and a DBOS workflow starts immediately. However, it waits for a START_SIGNAL before actually executing the flow. This ensures the event stream is initialized.

  2. Phase 2 (Subscribe): You connect to the event stream. The workflow has already written the EXECUTION_CREATED event, so the stream is ready.

  3. Phase 3 (Start): You send the START_SIGNAL. The workflow resumes and begins executing nodes, emitting events in real-time.

  4. Phase 4 (Execute): The flow executes, nodes run, edges transfer data, and all progress is streamed to your client as events.

  5. Phase 5 (Complete): The flow completes (or fails), emits a final event, and the execution transitions to a terminal state.

This design ensures you never miss events because the subscription is established before execution begins.

Quick Start

Initialize the Client

The client requires a WebSocket connection to the execution API. In production, this is wss://execution-api.chaingraph.io.

typescript
import { createTRPCClient } from '@badaitech/chaingraph-executor/client'
import SuperJSON from 'superjson'

const client = createTRPCClient({
  url: 'wss://execution-api.chaingraph.io',
  superjsonCustom: SuperJSON,
  auth: {
    sessionBadAI: 'your-session-token'  // Your authenticated session
  }
})

About the Connection:

  • Protocol: WebSocket (wss:// for production, ws:// for local dev)
  • Authentication: Uses sessionBadAI token passed in connection params
  • Persistence: Connection stays open for subscriptions, auto-reconnects on failure
  • Keep-Alive: Automatic ping/pong every 5 seconds to maintain connection

Configuration Options

Production Setup

typescript
const client = createTRPCClient({
  url: 'wss://execution-api.chaingraph.io',
  superjsonCustom: SuperJSON,
  auth: {
    sessionBadAI: 'your-session-token' // Required for authentication
  },
  wsClientCallbacks: {
    onOpen: () => console.log('Connected to execution engine'),
    onError: (err) => console.error('Connection error:', err),
    onClose: (cause) => console.log('Connection closed:', cause)
  }
})

Local Development

typescript
const client = createTRPCClient({
  url: 'ws://localhost:4021',
  superjsonCustom: SuperJSON,
  auth: {
    sessionBadAI: 'your-session-token'
  }
})

API Reference

1. Create Execution

Creates a new execution instance for a flow. The execution is created in a "created" state and must be explicitly started.

typescript
const result = await client.create.mutate({
  flowId: 'V2-your-flow-id',
  options: {
    execution: {
      maxConcurrency: 10,        // Max parallel node executions (default: 100)
      nodeTimeoutMs: 300000,     // Node timeout in ms (default: 60000)
      flowTimeoutMs: 900000      // Flow timeout in ms (default: 300000)
    },
    debug: false,                // Enable debug mode (default: false)
    breakpoints: []              // Array of node IDs for breakpoints
  },
  integration: {
    archai: {
      agentID: 'agent-123',
      agentSession: 'session-token',
      chatID: 'chat-456',
      messageID: 789
    },
    wallet: {
      isConnected: true,
      address: '0x...',
      chainId: 1,
      providerType: 'metamask'
    }
  },
  events: [
    {
      eventName: 'user-action',
      payload: { data: 'custom-data' }
    }
  ]
})

console.log(result.executionId) // 'exec_abc123'

Input Schema

  • flowId (required): Flow ID to execute (must start with 'V2')
  • options (optional): Execution configuration
    • execution: Runtime limits
      • maxConcurrency: Maximum parallel node executions
      • nodeTimeoutMs: Timeout per node in milliseconds
      • flowTimeoutMs: Timeout for entire flow in milliseconds
    • debug: Enable debugging features
    • breakpoints: Array of node IDs to pause at
  • integration (optional): Integration context
    • archai: ArchAI chat integration context
    • wallet: Web3 wallet integration context
    • external: Custom integration data
  • events (optional): External events to trigger event listeners

2. Start Execution

Starts an execution that was previously created.

typescript
const result = await client.start.mutate({
  executionId: 'exec_abc123'
})

console.log(result.success) // true

Note: Execution must be in "created" status. Once started, it transitions to "running" status.

3. Stop Execution

Stops a running execution. This cancels the workflow and all its children.

typescript
const result = await client.stop.mutate({
  executionId: 'exec_abc123',
  reason: 'User requested stop' // Optional
})

console.log(result.success) // true

Allowed States: Can stop executions in "created", "running", or "paused" status.

4. Pause Execution

Pauses a running execution. The execution can be resumed later.

typescript
const result = await client.pause.mutate({
  executionId: 'exec_abc123',
  reason: 'User requested pause' // Optional
})

console.log(result.success) // true

Allowed States: Can only pause executions in "running" status.

5. Resume Execution

Resumes a paused execution.

typescript
const result = await client.resume.mutate({
  executionId: 'exec_abc123'
})

console.log(result.success) // true

Allowed States: Can only resume executions in "paused" status.

6. Get Execution Details

Retrieves detailed information about an execution.

typescript
const execution = await client.getExecutionDetails.query({
  executionId: 'exec_abc123'
})

console.log(execution.status)        // 'running', 'completed', 'failed', etc.
console.log(execution.createdAt)     // Date
console.log(execution.startedAt)     // Date | null
console.log(execution.completedAt)   // Date | null
console.log(execution.errorMessage)  // string | null
console.log(execution.errorNodeId)   // string | null

Execution Row Properties

typescript
interface ExecutionRow {
  id: string                        // Execution ID
  flowId: string                    // Flow ID
  ownerId: string                   // Owner user ID
  rootExecutionId: string           // Root execution (for nested executions)
  parentExecutionId: string | null  // Parent execution (for child executions)
  status: ExecutionStatus           // Current status
  createdAt: Date
  updatedAt: Date
  startedAt: Date | null
  completedAt: Date | null
  errorMessage: string | null
  errorNodeId: string | null        // Node that caused failure
  executionDepth: number            // Nesting level (0 for root)
  options: ExecutionOptions
  integration: IntegrationContext
  externalEvents: ExecutionExternalEvent[]
  failureCount: number
  lastFailureReason: string | null
  lastFailureAt: Date | null
  processingStartedAt: Date | null
  processingWorkerId: string | null
}

7. Get Execution Tree

Retrieves the entire execution tree (including child executions).

typescript
const tree = await client.getExecutionsTree.query({
  executionId: 'exec_abc123'
})

tree.forEach(node => {
  console.log(`Level ${node.level}: ${node.id}`)
  console.log(`  Parent: ${node.parentId}`)
  console.log(`  Status: ${node.execution.status}`)
})

Execution Tree Node

typescript
interface ExecutionTreeNode {
  id: string                     // Execution ID
  parentId: string | null        // Parent execution ID
  level: number                  // Nesting level (0 for root)
  execution: ExecutionRow        // Full execution details
}

8. Get Root Executions

Lists all root executions for a specific flow.

typescript
const executions = await client.getRootExecutions.query({
  flowId: 'V2-your-flow-id',
  limit: 50,                      // Max 100, default 50
  after: new Date('2024-01-01')   // Optional: only executions after this date
})

executions.forEach(rootExec => {
  console.log(`Execution: ${rootExec.execution.id}`)
  console.log(`  Nested levels: ${rootExec.levels}`)
  console.log(`  Total nested: ${rootExec.totalNested}`)
})

Root Execution

typescript
interface RootExecution {
  execution: ExecutionRow  // Root execution details
  levels: number           // Max nesting depth
  totalNested: number      // Total child executions
}

9. Subscribe to Execution Events

Subscribe to real-time execution events via WebSocket.

typescript
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'

// Create type-safe event handler
const handleEvent = createExecutionEventHandler({
  [ExecutionEventEnum.NODE_COMPLETED]: (data) => {
    console.log(`Node completed: ${data.node.id}`)
  },
  [ExecutionEventEnum.FLOW_COMPLETED]: () => {
    console.log('Flow completed!')
  },
})

const subscription = client.subscribeToExecutionEvents.subscribe(
  {
    executionId: 'exec_abc123',
    fromIndex: 0,               // Start from event index (default: 0)
    eventTypes: [],             // Filter by event types (empty = all events)
    batchSize: 100,             // Max events per batch (1-1000, default: 100)
    batchTimeoutMs: 25          // Batch timeout in ms (0-1000, default: 25)
  },
  {
    onData: (events) => {
      events.forEach(event => {
        console.log(`Event ${event.index}: ${event.type}`)
        handleEvent(event)  // Type-safe event handling
      })
    },
    onError: (err) => {
      console.error('Subscription error:', err)
    },
    onComplete: () => {
      console.log('Subscription completed')
    }
  }
)

// Later: unsubscribe
subscription.unsubscribe()

Event Types

Common event types you can filter by (from ExecutionEventEnum):

typescript
import { ExecutionEventEnum } from '@badaitech/chaingraph-types'

// Flow-level events
ExecutionEventEnum.FLOW_STARTED
ExecutionEventEnum.FLOW_COMPLETED
ExecutionEventEnum.FLOW_FAILED
ExecutionEventEnum.FLOW_CANCELLED

// Node-level events
ExecutionEventEnum.NODE_STARTED
ExecutionEventEnum.NODE_COMPLETED
ExecutionEventEnum.NODE_FAILED
ExecutionEventEnum.NODE_SKIPPED
ExecutionEventEnum.NODE_STATUS_CHANGED
ExecutionEventEnum.NODE_BACKGROUNDED

// Edge transfer events
ExecutionEventEnum.EDGE_TRANSFER_COMPLETED
ExecutionEventEnum.EDGE_TRANSFER_FAILED

// Debug events
ExecutionEventEnum.NODE_DEBUG_LOG_STRING
ExecutionEventEnum.DEBUG_BREAKPOINT_HIT

Event Structure

typescript
interface ExecutionEvent {
  index: number        // Sequential event index
  type: string         // Event type
  timestamp: Date      // Event timestamp
  data: any           // Event-specific data
}

Type-Safe Event Handlers

ChainGraph provides createExecutionEventHandler for type-safe event handling. This eliminates the need for manual type switching and provides full TypeScript inference for event data.

Basic Event Handler

typescript
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'

const handleEvent = createExecutionEventHandler({
  [ExecutionEventEnum.FLOW_STARTED]: (data) => {
    // data is automatically typed as FlowStartedData
    console.log('Flow started:', data.flowMetadata)
  },
  [ExecutionEventEnum.NODE_COMPLETED]: (data) => {
    // data is automatically typed as NodeCompletedData
    console.log('Node completed:', data.node.id)
    console.log('Execution time:', data.executionTime, 'ms')
  },
  [ExecutionEventEnum.FLOW_FAILED]: (data) => {
    // data is automatically typed as FlowFailedData
    console.error('Flow failed:', data.error.message)
  },
})

// Use the handler in subscription
client.subscribeToExecutionEvents.subscribe(
  { executionId, eventTypes: [] },
  {
    onData: (events) => {
      events.forEach(event => handleEvent(event))
    }
  }
)

Benefits of Event Handlers

  1. Type Safety: Each event handler gets properly typed data parameter
  2. Autocomplete: IDE provides suggestions for event data properties
  3. Compile-Time Checks: TypeScript catches typos and incorrect property access
  4. Cleaner Code: No switch statements or manual type assertions

Handler with Error Handling

typescript
const handleEvent = createExecutionEventHandler(
  {
    [ExecutionEventEnum.NODE_FAILED]: (data) => {
      console.error('Node failed:', data.node.id)
      console.error('Error:', data.error.message)
      // Notify external error tracking
      errorTracker.captureError(data.error)
    },
    [ExecutionEventEnum.FLOW_FAILED]: (data) => {
      console.error('Flow failed:', data.error.message)
      // Trigger alerting system
      alerting.send(`Execution failed: ${data.error.message}`)
    },
  },
  {
    // Error handler for the event handlers themselves
    onError: (error, event) => {
      console.error('Error in event handler:', error)
      console.error('Event:', event.type)
    }
  }
)

Complete Example

typescript
import { createTRPCClient } from '@badaitech/chaingraph-executor/client'
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'
import SuperJSON from 'superjson'

async function executeFlow() {
  // 1. Create client
  const client = createTRPCClient({
    url: 'wss://execution-api.chaingraph.io',
    superjsonCustom: SuperJSON,
    auth: {
      sessionBadAI: process.env.SESSION_TOKEN
    }
  })

  try {
    // 2. Create execution
    const { executionId } = await client.create.mutate({
      flowId: 'V2-my-flow',
      options: {
        execution: {
          maxConcurrency: 10,
          nodeTimeoutMs: 300000,
          flowTimeoutMs: 900000
        },
        debug: false
      },
      integration: {
        archai: {
          agentID: 'agent-123',
          agentSession: 'session-abc',
          chatID: 'chat-456',
          messageID: 789
        }
      }
    })

    console.log('Execution created:', executionId)

    // 3. Subscribe to events with type-safe handlers
    let subscription: ReturnType<typeof client.subscribeToExecutionEvents.subscribe>

    const handleEvent = createExecutionEventHandler({
      [ExecutionEventEnum.FLOW_COMPLETED]: () => {
        console.log('Flow completed successfully!')
      },
      [ExecutionEventEnum.FLOW_FAILED]: (data) => {
        console.error('Flow failed:', data.error.message)
      },
      [ExecutionEventEnum.NODE_FAILED]: (data) => {
        console.error('Node failed:', data.node.id, data.error.message)
      },
    })

    subscription = client.subscribeToExecutionEvents.subscribe(
      {
        executionId,
        fromIndex: 0,
        eventTypes: [
          ExecutionEventEnum.FLOW_COMPLETED,
          ExecutionEventEnum.FLOW_FAILED,
          ExecutionEventEnum.NODE_FAILED
        ]
      },
      {
        onData: (events) => {
          events.forEach(event => handleEvent(event))
        },
        onError: (err) => console.error('Subscription error:', err)
      }
    )

    // 4. Start execution
    await client.start.mutate({ executionId })
    console.log('Execution started')

    // 5. Wait for completion (in real app, handle via subscription)
    await new Promise(resolve => setTimeout(resolve, 60000))

    // 6. Get final status
    const execution = await client.getExecutionDetails.query({ executionId })
    console.log('Final status:', execution.status)

    // 7. Cleanup
    subscription.unsubscribe()

  } catch (error) {
    console.error('Execution error:', error)
  }
}

executeFlow()

Execution Status Values

typescript
enum ExecutionStatus {
  Idle = 'idle',
  Creating = 'creating',
  Created = 'created',       // Ready to start
  Running = 'running',       // Currently executing
  Paused = 'paused',         // Paused by user
  Completed = 'completed',   // Finished successfully
  Failed = 'failed',         // Failed with error
  Stopped = 'stopped'        // Stopped by user
}

Integration Contexts

ArchAI Integration

typescript
interface ArchAIContext {
  agentID?: string        // Agent identifier
  agentSession?: string   // Agent session token
  chatID?: string         // Chat room ID
  messageID?: number      // Message ID that triggered execution
}

Wallet Integration

typescript
interface WalletContext {
  isConnected: boolean
  address?: string                    // Wallet address
  chainId?: number                    // Chain ID (1 for Ethereum mainnet)
  providerType?: string               // 'metamask', 'walletconnect', etc.
  ensName?: string                    // ENS name if available
  capabilities?: {
    supportsBatchTransactions?: boolean
    supportsEIP1559?: boolean
    supportsEIP712?: boolean
  }
  lastUpdated?: number                // Timestamp
  rpcUrl?: string                     // Custom RPC endpoint
}

Error Handling

All mutations and queries can throw tRPC errors:

typescript
import { TRPCClientError } from '@trpc/client'

try {
  await client.start.mutate({ executionId })
} catch (error) {
  if (error instanceof TRPCClientError) {
    switch (error.data?.code) {
      case 'UNAUTHORIZED':
        console.error('Invalid session token')
        break
      case 'NOT_FOUND':
        console.error('Execution not found')
        break
      case 'FORBIDDEN':
        console.error('No access to this flow')
        break
      case 'BAD_REQUEST':
        console.error('Invalid request:', error.message)
        break
      default:
        console.error('Unexpected error:', error.message)
    }
  }
}

Type Safety

The client provides full TypeScript type safety:

typescript
import type {
  RouterInputs,
  RouterOutputs
} from '@badaitech/chaingraph-executor/client'

// Input types
type CreateInput = RouterInputs['create']
type StartInput = RouterInputs['start']

// Output types
type CreateOutput = RouterOutputs['create']
type ExecutionDetails = RouterOutputs['getExecutionDetails']

Advanced Usage

Custom SuperJSON Configuration

If you need custom serialization (e.g., for custom classes):

typescript
import SuperJSON from 'superjson'
import { registerSuperjsonTransformers } from '@badaitech/chaingraph-types'
import { NodeRegistry } from '@badaitech/chaingraph-types'

// Register ChainGraph transformers
registerSuperjsonTransformers(SuperJSON, NodeRegistry.getInstance())

const client = createTRPCClient({
  url: 'wss://execution-api.chaingraph.io',
  superjsonCustom: SuperJSON,
  auth: { sessionBadAI: token }
})

React Integration

For React applications, use the React Query hooks:

typescript
import { trpcReact, TRPCProvider, getExecutorQueryClient } from '@badaitech/chaingraph-executor/client'
import { QueryClientProvider } from '@tanstack/react-query'

// In your root component
function App() {
  const queryClient = getExecutorQueryClient()
  const trpcClient = trpcReact.createClient({
    url: 'wss://execution-api.chaingraph.io',
    auth: { sessionBadAI: token }
  })

  return (
    <trpcReact.Provider client={trpcClient} queryClient={queryClient}>
      <QueryClientProvider client={queryClient}>
        <YourApp />
      </QueryClientProvider>
    </trpcReact.Provider>
  )
}

// In your components
function ExecutionComponent({ flowId }) {
  const createMutation = trpcReact.create.useMutation()
  const startMutation = trpcReact.start.useMutation()

  const handleExecute = async () => {
    const { executionId } = await createMutation.mutateAsync({ flowId })
    await startMutation.mutateAsync({ executionId })
  }

  return <button onClick={handleExecute}>Execute Flow</button>
}

Best Practices

1. Always Subscribe Before Starting

Set up event subscriptions before calling start to avoid missing events:

typescript
// ✅ Correct order
const subscription = client.subscribeToExecutionEvents.subscribe(...)
await client.start.mutate({ executionId })

// ❌ Wrong - may miss early events
await client.start.mutate({ executionId })
const subscription = client.subscribeToExecutionEvents.subscribe(...)

2. Use Type-Safe Event Handlers

Always use createExecutionEventHandler instead of manual type checking:

typescript
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'

// ✅ Correct - type-safe
const handleEvent = createExecutionEventHandler({
  [ExecutionEventEnum.NODE_FAILED]: (data) => {
    console.error(data.node.id, data.error.message)  // Fully typed
  },
})

// ❌ Avoid - manual type checking
events.forEach(event => {
  if (event.type === 'NODE_FAILED') {
    console.error(event.data.node.id)  // No type safety
  }
})

3. Use Appropriate Timeouts

Set realistic timeouts based on your flow complexity:

typescript
options: {
  execution: {
    nodeTimeoutMs: 60000,    // Simple flows: 1 minute
    nodeTimeoutMs: 300000,   // Complex flows: 5 minutes
    nodeTimeoutMs: 900000,   // Long-running flows: 15 minutes
    flowTimeoutMs: 1800000,  // Very long flows: 30 minutes
  }
}

4. Clean Up Subscriptions

Always unsubscribe when done to prevent memory leaks:

typescript
let subscription: ReturnType<typeof client.subscribeToExecutionEvents.subscribe>

const handleEvent = createExecutionEventHandler({
  [ExecutionEventEnum.FLOW_COMPLETED]: () => {
    subscription.unsubscribe()  // Clean up
  },
})

subscription = client.subscribeToExecutionEvents.subscribe(...)

5. Filter Events Efficiently

Use eventTypes to receive only the events you need:

typescript
// Only monitor critical events
eventTypes: [
  ExecutionEventEnum.FLOW_COMPLETED,
  ExecutionEventEnum.FLOW_FAILED,
  ExecutionEventEnum.NODE_FAILED
]

// All events (use sparingly)
eventTypes: []

6. Handle Errors Gracefully

Always check execution status and handle errors:

typescript
const handleEvent = createExecutionEventHandler(
  {
    [ExecutionEventEnum.FLOW_FAILED]: (data) => {
      // Log error
      console.error('Flow failed:', data.error.message)

      // Notify user
      notificationService.error('Execution failed')

      // Track for analytics
      analytics.track('execution_failed', { error: data.error.message })
    },
  },
  {
    onError: (error, event) => {
      // Handle errors in event handlers themselves
      console.error('Handler error:', error)
      errorTracker.captureException(error)
    }
  }
)

License

BUSL-1.1 - See LICENSE for details.

Copyright (c) 2025 BadLabs

Licensed under BUSL-1.1