Executor Client
The ChainGraph Executor Client (@badaitech/chaingraph-executor/client) provides a type-safe tRPC client for executing flows on the ChainGraph execution engine. It enables real-time communication with the execution API via WebSockets, allowing you to trigger flow executions, monitor their progress, and control their lifecycle from your application.
Related Documentation
- API Reference: Executor Client - Type definitions and function signatures
- API Reference: Executor Package - Server-side configuration and workers
- Examples - Code examples on GitHub
Overview
The executor client acts as a bridge between your application and the distributed ChainGraph execution engine. It handles:
- Flow Execution Orchestration: Create, start, pause, resume, and stop flow executions
- Real-Time Event Streaming: Subscribe to execution events via WebSocket for live updates
- Execution Monitoring: Query execution status, history, and hierarchical execution trees
- Integration Context: Pass runtime context (chat sessions, wallet connections, custom data) to flows
- Type Safety: Full TypeScript support with end-to-end type inference
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ChainGraph Executor Client │ │
│ │ (createTRPCClient) │ │
│ └──────────┬───────────────────────────────────────────────┘ │
└─────────────┼───────────────────────────────────────────────────┘
│ WebSocket (wss://)
│ ├─ Mutations (create, start, stop, pause, resume)
│ ├─ Queries (getExecutionDetails, getExecutionsTree)
│ └─ Subscriptions (real-time events)
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ ChainGraph Execution Engine │
│ wss://execution-api.chaingraph.io │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ tRPC Router │ │
│ │ ├─ Authentication (sessionBadAI) │ │
│ │ ├─ Execution Store (PostgreSQL) │ │
│ │ └─ Event Bus (DBOS) │ │
│ └──────────┬───────────────────────────────────────────────┘ │
└─────────────┼───────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ DBOS Workflow Engine │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Execution Workers (Horizontally Scalable) │ │
│ │ ├─ Flow Execution (nodes, edges, transfers) │ │
│ │ ├─ Event Emission (real-time streaming) │ │
│ │ └─ Child Execution Spawning │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Key Concepts
Two-Phase Execution Model: Executions are created first (initializing resources and event streams), then explicitly started. This ensures your event subscription is ready before execution begins.
Event-Driven Architecture: All execution progress is communicated via events streamed over WebSocket. Your client subscribes to events and reacts to flow progress in real-time.
Hierarchical Executions: Flows can spawn child executions via event emitters, creating execution trees. The client can query these trees to understand the full execution hierarchy.
At-Least-Once Delivery: The execution engine guarantees all events are delivered at least once. Failed executions are automatically retried by the recovery service.
Installation
npm install @badaitech/chaingraph-executor superjsonWhy SuperJSON? ChainGraph uses SuperJSON for complex type serialization (Dates, Maps, Sets, custom classes). This ensures data fidelity between client and server.
Execution Lifecycle
Understanding the execution lifecycle is crucial for working with the executor client. Every execution goes through well-defined phases, from creation to completion.
Lifecycle State Machine
┌──────────┐
│ idle │ (Not yet created)
└─────┬────┘
│
create.mutate()
│
▼
┌──────────┐
┌────▶│ created │◀────┐ (Initialized, waiting for START_SIGNAL)
│ └─────┬────┘ │
│ │ │
│ start.mutate() │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ running │─────┤ (Actively executing nodes)
│ └─────┬────┘ │
│ │ │
│ pause.mutate() │
│ │ │
│ ▼ │
resume. │ ┌──────────┐ │
mutate() │ │ paused │─────┘
│ └──────────┘
│
│
stop.mutate()
│
┌─────────┴──────────┬───────────────┬──────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ stopped │ │completed │ │ failed │ │ timeout │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
(User cancel) (Success) (Error) (Exceeded limit)Execution Flow Sequence
Here's what happens when you execute a flow from the client perspective:
Client Execution API DBOS Workflow
│ │ │
│─────create.mutate()─────────────▶│ │
│ │ │
│ ├──[1] Create execution row │
│ │ (status: created) │
│ │ │
│ ├──[2] Initialize event stream │
│ │ │
│ ├─────start workflow─────────────▶│
│ │ │
│◀─────{executionId}───────────────┤ │
│ │ ├─[3] Write EXECUTION_CREATED event
│ │ │
│ │ ├─[4] Wait for START_SIGNAL...
│ │ │
│──subscribe.subscribeToEvents────▶│ │
│ │ │
│ ├──connect to event stream───────▶│
│ │ │
│◀─────[event stream active]───────┤◀────[ready to emit]─────────────┤
│ │ │
│─────start.mutate()──────────────▶│ │
│ │ │
│ ├─────DBOS.send('START_SIGNAL')──▶│
│ │ │
│◀─────{success: true}─────────────┤ │
│ │ ├─[5] Resume from signal
│ │ │
│ │ ├─[6] Execute flow nodes
│ │ │
│◀─────FLOW_STARTED event──────────┤◀────[emit]──────────────────────┤
│◀─────NODE_STARTED events─────────┤◀────[emit]──────────────────────┤
│◀─────NODE_COMPLETED events───────┤◀────[emit]──────────────────────┤
│◀─────EDGE_TRANSFER events────────┤◀────[emit]──────────────────────┤
│◀─────FLOW_COMPLETED event────────┤◀────[emit]──────────────────────┤
│ │ │
│──getExecutionDetails.query()────▶│ │
│◀─────{status: completed}─────────┤ │
│ │ │
│──unsubscribe()──────────────────▶│ │
│ ├──disconnect event stream───────▶│
│ │ │Key Insights:
Phase 1 (Create): The execution is created and a DBOS workflow starts immediately. However, it waits for a START_SIGNAL before actually executing the flow. This ensures the event stream is initialized.
Phase 2 (Subscribe): You connect to the event stream. The workflow has already written the EXECUTION_CREATED event, so the stream is ready.
Phase 3 (Start): You send the START_SIGNAL. The workflow resumes and begins executing nodes, emitting events in real-time.
Phase 4 (Execute): The flow executes, nodes run, edges transfer data, and all progress is streamed to your client as events.
Phase 5 (Complete): The flow completes (or fails), emits a final event, and the execution transitions to a terminal state.
This design ensures you never miss events because the subscription is established before execution begins.
Quick Start
Initialize the Client
The client requires a WebSocket connection to the execution API. In production, this is wss://execution-api.chaingraph.io.
import { createTRPCClient } from '@badaitech/chaingraph-executor/client'
import SuperJSON from 'superjson'
const client = createTRPCClient({
url: 'wss://execution-api.chaingraph.io',
superjsonCustom: SuperJSON,
auth: {
sessionBadAI: 'your-session-token' // Your authenticated session
}
})About the Connection:
- Protocol: WebSocket (wss:// for production, ws:// for local dev)
- Authentication: Uses
sessionBadAItoken passed in connection params - Persistence: Connection stays open for subscriptions, auto-reconnects on failure
- Keep-Alive: Automatic ping/pong every 5 seconds to maintain connection
Configuration Options
Production Setup
const client = createTRPCClient({
url: 'wss://execution-api.chaingraph.io',
superjsonCustom: SuperJSON,
auth: {
sessionBadAI: 'your-session-token' // Required for authentication
},
wsClientCallbacks: {
onOpen: () => console.log('Connected to execution engine'),
onError: (err) => console.error('Connection error:', err),
onClose: (cause) => console.log('Connection closed:', cause)
}
})Local Development
const client = createTRPCClient({
url: 'ws://localhost:4021',
superjsonCustom: SuperJSON,
auth: {
sessionBadAI: 'your-session-token'
}
})API Reference
1. Create Execution
Creates a new execution instance for a flow. The execution is created in a "created" state and must be explicitly started.
const result = await client.create.mutate({
flowId: 'V2-your-flow-id',
options: {
execution: {
maxConcurrency: 10, // Max parallel node executions (default: 100)
nodeTimeoutMs: 300000, // Node timeout in ms (default: 60000)
flowTimeoutMs: 900000 // Flow timeout in ms (default: 300000)
},
debug: false, // Enable debug mode (default: false)
breakpoints: [] // Array of node IDs for breakpoints
},
integration: {
archai: {
agentID: 'agent-123',
agentSession: 'session-token',
chatID: 'chat-456',
messageID: 789
},
wallet: {
isConnected: true,
address: '0x...',
chainId: 1,
providerType: 'metamask'
}
},
events: [
{
eventName: 'user-action',
payload: { data: 'custom-data' }
}
]
})
console.log(result.executionId) // 'exec_abc123'Input Schema
- flowId (required): Flow ID to execute (must start with 'V2')
- options (optional): Execution configuration
- execution: Runtime limits
maxConcurrency: Maximum parallel node executionsnodeTimeoutMs: Timeout per node in millisecondsflowTimeoutMs: Timeout for entire flow in milliseconds
- debug: Enable debugging features
- breakpoints: Array of node IDs to pause at
- execution: Runtime limits
- integration (optional): Integration context
- archai: ArchAI chat integration context
- wallet: Web3 wallet integration context
- external: Custom integration data
- events (optional): External events to trigger event listeners
2. Start Execution
Starts an execution that was previously created.
const result = await client.start.mutate({
executionId: 'exec_abc123'
})
console.log(result.success) // trueNote: Execution must be in "created" status. Once started, it transitions to "running" status.
3. Stop Execution
Stops a running execution. This cancels the workflow and all its children.
const result = await client.stop.mutate({
executionId: 'exec_abc123',
reason: 'User requested stop' // Optional
})
console.log(result.success) // trueAllowed States: Can stop executions in "created", "running", or "paused" status.
4. Pause Execution
Pauses a running execution. The execution can be resumed later.
const result = await client.pause.mutate({
executionId: 'exec_abc123',
reason: 'User requested pause' // Optional
})
console.log(result.success) // trueAllowed States: Can only pause executions in "running" status.
5. Resume Execution
Resumes a paused execution.
const result = await client.resume.mutate({
executionId: 'exec_abc123'
})
console.log(result.success) // trueAllowed States: Can only resume executions in "paused" status.
6. Get Execution Details
Retrieves detailed information about an execution.
const execution = await client.getExecutionDetails.query({
executionId: 'exec_abc123'
})
console.log(execution.status) // 'running', 'completed', 'failed', etc.
console.log(execution.createdAt) // Date
console.log(execution.startedAt) // Date | null
console.log(execution.completedAt) // Date | null
console.log(execution.errorMessage) // string | null
console.log(execution.errorNodeId) // string | nullExecution Row Properties
interface ExecutionRow {
id: string // Execution ID
flowId: string // Flow ID
ownerId: string // Owner user ID
rootExecutionId: string // Root execution (for nested executions)
parentExecutionId: string | null // Parent execution (for child executions)
status: ExecutionStatus // Current status
createdAt: Date
updatedAt: Date
startedAt: Date | null
completedAt: Date | null
errorMessage: string | null
errorNodeId: string | null // Node that caused failure
executionDepth: number // Nesting level (0 for root)
options: ExecutionOptions
integration: IntegrationContext
externalEvents: ExecutionExternalEvent[]
failureCount: number
lastFailureReason: string | null
lastFailureAt: Date | null
processingStartedAt: Date | null
processingWorkerId: string | null
}7. Get Execution Tree
Retrieves the entire execution tree (including child executions).
const tree = await client.getExecutionsTree.query({
executionId: 'exec_abc123'
})
tree.forEach(node => {
console.log(`Level ${node.level}: ${node.id}`)
console.log(` Parent: ${node.parentId}`)
console.log(` Status: ${node.execution.status}`)
})Execution Tree Node
interface ExecutionTreeNode {
id: string // Execution ID
parentId: string | null // Parent execution ID
level: number // Nesting level (0 for root)
execution: ExecutionRow // Full execution details
}8. Get Root Executions
Lists all root executions for a specific flow.
const executions = await client.getRootExecutions.query({
flowId: 'V2-your-flow-id',
limit: 50, // Max 100, default 50
after: new Date('2024-01-01') // Optional: only executions after this date
})
executions.forEach(rootExec => {
console.log(`Execution: ${rootExec.execution.id}`)
console.log(` Nested levels: ${rootExec.levels}`)
console.log(` Total nested: ${rootExec.totalNested}`)
})Root Execution
interface RootExecution {
execution: ExecutionRow // Root execution details
levels: number // Max nesting depth
totalNested: number // Total child executions
}9. Subscribe to Execution Events
Subscribe to real-time execution events via WebSocket.
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'
// Create type-safe event handler
const handleEvent = createExecutionEventHandler({
[ExecutionEventEnum.NODE_COMPLETED]: (data) => {
console.log(`Node completed: ${data.node.id}`)
},
[ExecutionEventEnum.FLOW_COMPLETED]: () => {
console.log('Flow completed!')
},
})
const subscription = client.subscribeToExecutionEvents.subscribe(
{
executionId: 'exec_abc123',
fromIndex: 0, // Start from event index (default: 0)
eventTypes: [], // Filter by event types (empty = all events)
batchSize: 100, // Max events per batch (1-1000, default: 100)
batchTimeoutMs: 25 // Batch timeout in ms (0-1000, default: 25)
},
{
onData: (events) => {
events.forEach(event => {
console.log(`Event ${event.index}: ${event.type}`)
handleEvent(event) // Type-safe event handling
})
},
onError: (err) => {
console.error('Subscription error:', err)
},
onComplete: () => {
console.log('Subscription completed')
}
}
)
// Later: unsubscribe
subscription.unsubscribe()Event Types
Common event types you can filter by (from ExecutionEventEnum):
import { ExecutionEventEnum } from '@badaitech/chaingraph-types'
// Flow-level events
ExecutionEventEnum.FLOW_STARTED
ExecutionEventEnum.FLOW_COMPLETED
ExecutionEventEnum.FLOW_FAILED
ExecutionEventEnum.FLOW_CANCELLED
// Node-level events
ExecutionEventEnum.NODE_STARTED
ExecutionEventEnum.NODE_COMPLETED
ExecutionEventEnum.NODE_FAILED
ExecutionEventEnum.NODE_SKIPPED
ExecutionEventEnum.NODE_STATUS_CHANGED
ExecutionEventEnum.NODE_BACKGROUNDED
// Edge transfer events
ExecutionEventEnum.EDGE_TRANSFER_COMPLETED
ExecutionEventEnum.EDGE_TRANSFER_FAILED
// Debug events
ExecutionEventEnum.NODE_DEBUG_LOG_STRING
ExecutionEventEnum.DEBUG_BREAKPOINT_HITEvent Structure
interface ExecutionEvent {
index: number // Sequential event index
type: string // Event type
timestamp: Date // Event timestamp
data: any // Event-specific data
}Type-Safe Event Handlers
ChainGraph provides createExecutionEventHandler for type-safe event handling. This eliminates the need for manual type switching and provides full TypeScript inference for event data.
Basic Event Handler
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'
const handleEvent = createExecutionEventHandler({
[ExecutionEventEnum.FLOW_STARTED]: (data) => {
// data is automatically typed as FlowStartedData
console.log('Flow started:', data.flowMetadata)
},
[ExecutionEventEnum.NODE_COMPLETED]: (data) => {
// data is automatically typed as NodeCompletedData
console.log('Node completed:', data.node.id)
console.log('Execution time:', data.executionTime, 'ms')
},
[ExecutionEventEnum.FLOW_FAILED]: (data) => {
// data is automatically typed as FlowFailedData
console.error('Flow failed:', data.error.message)
},
})
// Use the handler in subscription
client.subscribeToExecutionEvents.subscribe(
{ executionId, eventTypes: [] },
{
onData: (events) => {
events.forEach(event => handleEvent(event))
}
}
)Benefits of Event Handlers
- Type Safety: Each event handler gets properly typed
dataparameter - Autocomplete: IDE provides suggestions for event data properties
- Compile-Time Checks: TypeScript catches typos and incorrect property access
- Cleaner Code: No switch statements or manual type assertions
Handler with Error Handling
const handleEvent = createExecutionEventHandler(
{
[ExecutionEventEnum.NODE_FAILED]: (data) => {
console.error('Node failed:', data.node.id)
console.error('Error:', data.error.message)
// Notify external error tracking
errorTracker.captureError(data.error)
},
[ExecutionEventEnum.FLOW_FAILED]: (data) => {
console.error('Flow failed:', data.error.message)
// Trigger alerting system
alerting.send(`Execution failed: ${data.error.message}`)
},
},
{
// Error handler for the event handlers themselves
onError: (error, event) => {
console.error('Error in event handler:', error)
console.error('Event:', event.type)
}
}
)Complete Example
import { createTRPCClient } from '@badaitech/chaingraph-executor/client'
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'
import SuperJSON from 'superjson'
async function executeFlow() {
// 1. Create client
const client = createTRPCClient({
url: 'wss://execution-api.chaingraph.io',
superjsonCustom: SuperJSON,
auth: {
sessionBadAI: process.env.SESSION_TOKEN
}
})
try {
// 2. Create execution
const { executionId } = await client.create.mutate({
flowId: 'V2-my-flow',
options: {
execution: {
maxConcurrency: 10,
nodeTimeoutMs: 300000,
flowTimeoutMs: 900000
},
debug: false
},
integration: {
archai: {
agentID: 'agent-123',
agentSession: 'session-abc',
chatID: 'chat-456',
messageID: 789
}
}
})
console.log('Execution created:', executionId)
// 3. Subscribe to events with type-safe handlers
let subscription: ReturnType<typeof client.subscribeToExecutionEvents.subscribe>
const handleEvent = createExecutionEventHandler({
[ExecutionEventEnum.FLOW_COMPLETED]: () => {
console.log('Flow completed successfully!')
},
[ExecutionEventEnum.FLOW_FAILED]: (data) => {
console.error('Flow failed:', data.error.message)
},
[ExecutionEventEnum.NODE_FAILED]: (data) => {
console.error('Node failed:', data.node.id, data.error.message)
},
})
subscription = client.subscribeToExecutionEvents.subscribe(
{
executionId,
fromIndex: 0,
eventTypes: [
ExecutionEventEnum.FLOW_COMPLETED,
ExecutionEventEnum.FLOW_FAILED,
ExecutionEventEnum.NODE_FAILED
]
},
{
onData: (events) => {
events.forEach(event => handleEvent(event))
},
onError: (err) => console.error('Subscription error:', err)
}
)
// 4. Start execution
await client.start.mutate({ executionId })
console.log('Execution started')
// 5. Wait for completion (in real app, handle via subscription)
await new Promise(resolve => setTimeout(resolve, 60000))
// 6. Get final status
const execution = await client.getExecutionDetails.query({ executionId })
console.log('Final status:', execution.status)
// 7. Cleanup
subscription.unsubscribe()
} catch (error) {
console.error('Execution error:', error)
}
}
executeFlow()Execution Status Values
enum ExecutionStatus {
Idle = 'idle',
Creating = 'creating',
Created = 'created', // Ready to start
Running = 'running', // Currently executing
Paused = 'paused', // Paused by user
Completed = 'completed', // Finished successfully
Failed = 'failed', // Failed with error
Stopped = 'stopped' // Stopped by user
}Integration Contexts
ArchAI Integration
interface ArchAIContext {
agentID?: string // Agent identifier
agentSession?: string // Agent session token
chatID?: string // Chat room ID
messageID?: number // Message ID that triggered execution
}Wallet Integration
interface WalletContext {
isConnected: boolean
address?: string // Wallet address
chainId?: number // Chain ID (1 for Ethereum mainnet)
providerType?: string // 'metamask', 'walletconnect', etc.
ensName?: string // ENS name if available
capabilities?: {
supportsBatchTransactions?: boolean
supportsEIP1559?: boolean
supportsEIP712?: boolean
}
lastUpdated?: number // Timestamp
rpcUrl?: string // Custom RPC endpoint
}Error Handling
All mutations and queries can throw tRPC errors:
import { TRPCClientError } from '@trpc/client'
try {
await client.start.mutate({ executionId })
} catch (error) {
if (error instanceof TRPCClientError) {
switch (error.data?.code) {
case 'UNAUTHORIZED':
console.error('Invalid session token')
break
case 'NOT_FOUND':
console.error('Execution not found')
break
case 'FORBIDDEN':
console.error('No access to this flow')
break
case 'BAD_REQUEST':
console.error('Invalid request:', error.message)
break
default:
console.error('Unexpected error:', error.message)
}
}
}Type Safety
The client provides full TypeScript type safety:
import type {
RouterInputs,
RouterOutputs
} from '@badaitech/chaingraph-executor/client'
// Input types
type CreateInput = RouterInputs['create']
type StartInput = RouterInputs['start']
// Output types
type CreateOutput = RouterOutputs['create']
type ExecutionDetails = RouterOutputs['getExecutionDetails']Advanced Usage
Custom SuperJSON Configuration
If you need custom serialization (e.g., for custom classes):
import SuperJSON from 'superjson'
import { registerSuperjsonTransformers } from '@badaitech/chaingraph-types'
import { NodeRegistry } from '@badaitech/chaingraph-types'
// Register ChainGraph transformers
registerSuperjsonTransformers(SuperJSON, NodeRegistry.getInstance())
const client = createTRPCClient({
url: 'wss://execution-api.chaingraph.io',
superjsonCustom: SuperJSON,
auth: { sessionBadAI: token }
})React Integration
For React applications, use the React Query hooks:
import { trpcReact, TRPCProvider, getExecutorQueryClient } from '@badaitech/chaingraph-executor/client'
import { QueryClientProvider } from '@tanstack/react-query'
// In your root component
function App() {
const queryClient = getExecutorQueryClient()
const trpcClient = trpcReact.createClient({
url: 'wss://execution-api.chaingraph.io',
auth: { sessionBadAI: token }
})
return (
<trpcReact.Provider client={trpcClient} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>
<YourApp />
</QueryClientProvider>
</trpcReact.Provider>
)
}
// In your components
function ExecutionComponent({ flowId }) {
const createMutation = trpcReact.create.useMutation()
const startMutation = trpcReact.start.useMutation()
const handleExecute = async () => {
const { executionId } = await createMutation.mutateAsync({ flowId })
await startMutation.mutateAsync({ executionId })
}
return <button onClick={handleExecute}>Execute Flow</button>
}Best Practices
1. Always Subscribe Before Starting
Set up event subscriptions before calling start to avoid missing events:
// ✅ Correct order
const subscription = client.subscribeToExecutionEvents.subscribe(...)
await client.start.mutate({ executionId })
// ❌ Wrong - may miss early events
await client.start.mutate({ executionId })
const subscription = client.subscribeToExecutionEvents.subscribe(...)2. Use Type-Safe Event Handlers
Always use createExecutionEventHandler instead of manual type checking:
import { createExecutionEventHandler, ExecutionEventEnum } from '@badaitech/chaingraph-types'
// ✅ Correct - type-safe
const handleEvent = createExecutionEventHandler({
[ExecutionEventEnum.NODE_FAILED]: (data) => {
console.error(data.node.id, data.error.message) // Fully typed
},
})
// ❌ Avoid - manual type checking
events.forEach(event => {
if (event.type === 'NODE_FAILED') {
console.error(event.data.node.id) // No type safety
}
})3. Use Appropriate Timeouts
Set realistic timeouts based on your flow complexity:
options: {
execution: {
nodeTimeoutMs: 60000, // Simple flows: 1 minute
nodeTimeoutMs: 300000, // Complex flows: 5 minutes
nodeTimeoutMs: 900000, // Long-running flows: 15 minutes
flowTimeoutMs: 1800000, // Very long flows: 30 minutes
}
}4. Clean Up Subscriptions
Always unsubscribe when done to prevent memory leaks:
let subscription: ReturnType<typeof client.subscribeToExecutionEvents.subscribe>
const handleEvent = createExecutionEventHandler({
[ExecutionEventEnum.FLOW_COMPLETED]: () => {
subscription.unsubscribe() // Clean up
},
})
subscription = client.subscribeToExecutionEvents.subscribe(...)5. Filter Events Efficiently
Use eventTypes to receive only the events you need:
// Only monitor critical events
eventTypes: [
ExecutionEventEnum.FLOW_COMPLETED,
ExecutionEventEnum.FLOW_FAILED,
ExecutionEventEnum.NODE_FAILED
]
// All events (use sparingly)
eventTypes: []6. Handle Errors Gracefully
Always check execution status and handle errors:
const handleEvent = createExecutionEventHandler(
{
[ExecutionEventEnum.FLOW_FAILED]: (data) => {
// Log error
console.error('Flow failed:', data.error.message)
// Notify user
notificationService.error('Execution failed')
// Track for analytics
analytics.track('execution_failed', { error: data.error.message })
},
},
{
onError: (error, event) => {
// Handle errors in event handlers themselves
console.error('Handler error:', error)
errorTracker.captureException(error)
}
}
)License
BUSL-1.1 - See LICENSE for details.
Copyright (c) 2025 BadLabs