Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-trpc

@badaitech/chaingraph-trpc

License

A type-safe tRPC layer for the ChainGraph flow-based programming framework. This package provides end-to-end type safety between client and server, real-time subscriptions via WebSockets, and a robust API for flow management, folder organization, node registration, and execution control.

Overview

@badaitech/chaingraph-trpc serves as the communication backbone of ChainGraph, enabling:

  • Type-Safe APIs: Complete end-to-end type safety using tRPC and SuperJSON
  • Real-Time Updates: WebSocket-based subscriptions for flow and execution events
  • Flow Management: Create, retrieve, update, and delete flows with folder organization
  • Folder Hierarchy: Organize flows into hierarchical folders with move and ordering operations
  • Node Operations: Register, discover, and instantiate computational nodes
  • Execution Control: Start, stop, pause, and monitor flow executions
  • Storage Integration: Support for both in-memory and PostgreSQL persistence
  • Debugging Tools: Breakpoints, stepping, and execution monitoring

Installation

bash
# Before installing, make sure you have set up authentication for GitHub Packages
npm install @badaitech/chaingraph-trpc
# or
yarn add @badaitech/chaingraph-trpc
# or
pnpm add @badaitech/chaingraph-trpc

Authentication for GitHub Packages

To use this package, you need to configure npm to authenticate with GitHub Packages:

  1. Create a personal access token (PAT) with the read:packages scope on GitHub.
  2. Add the following to your project's .npmrc file or to your global ~/.npmrc file:
@badaitech:registry=https://npm.pkg.github.com
//npm.pkg.github.com/:_authToken=YOUR_GITHUB_PAT

Replace YOUR_GITHUB_PAT with your actual GitHub personal access token.

Quick Start

Client Setup

typescript
import { trpcClient, trpcReact, queryClient } from '@badaitech/chaingraph-trpc/client'
import { QueryClientProvider } from '@tanstack/react-query'
import { useState } from 'react'

function App() {
  // Set up WebSocket connection
  const [trpc] = useState(() => trpcReact.createClient({
    links: [
      wsLink({
        client: createWSClient({
          url: 'ws://localhost:3001',
        }),
      }),
    ],
  }))

  return (
    <trpcReact.Provider client={trpc} queryClient={queryClient}>
      <QueryClientProvider client={queryClient}>
        <YourApplication />
      </QueryClientProvider>
    </trpcReact.Provider>
  )
}

Server Setup

typescript
import { init, applyWSSHandler, appRouter, createContext } from '@badaitech/chaingraph-trpc/server'
import { WebSocketServer } from 'ws'

// Initialize tRPC context and stores
await init()

// Create WebSocket server
const wss = new WebSocketServer({ port: 3001 })
const handler = applyWSSHandler({
  wss,
  router: appRouter,
  createContext,
})

console.log('WebSocket Server listening on ws://localhost:3001')

Database Setup

Run migrations to set up the database schema:

bash
# Generate migration files
pnpm run migrate:generate

# Push migrations to PostgreSQL
pnpm run migrate:push

# Run migrations programmatically
pnpm run migrate:run

Set the DATABASE_URL environment variable:

bash
DATABASE_URL=postgres://username:password@localhost:5432/dbname

The latest migration (0007) adds folder hierarchy support with the chaingraph_flow_folders table.

Usage Patterns

Working with Flows

typescript
import { trpcClient } from '@badaitech/chaingraph-trpc/client'

// Create a new flow
const createFlow = async () => {
  const flow = await trpcClient.flow.create.mutate({
    name: 'My Flow',
    description: 'A flow for processing data',
    tags: ['demo', 'processing'],
  })
  return flow
}

// Add a node to a flow
const addNode = async (flowId, nodeType) => {
  const node = await trpcClient.flow.addNode.mutate({
    flowId,
    nodeType,
    position: { x: 100, y: 100 },
  })
  return node
}

// Connect node ports
const connectPorts = async (flowId, sourceNodeId, targetNodeId) => {
  const edge = await trpcClient.flow.connectPorts.mutate({
    flowId,
    sourceNodeId,
    sourcePortId: 'output',
    targetNodeId,
    targetPortId: 'input',
  })
  return edge
}

// Update port value
const updatePortValue = async (flowId, nodeId, portId, value) => {
  await trpcClient.flow.updatePortValue.mutate({
    flowId,
    nodeId,
    portId,
    value,
  })
}

// Subscribe to flow events
const subscribeToFlowEvents = async (flowId) => {
  const subscription = trpcClient.flow.subscribeToEvents.subscribe(
    { flowId },
    {
      onData: (event) => console.log('Flow event:', event),
      onError: (err) => console.error('Subscription error:', err),
      onComplete: () => console.log('Subscription complete'),
    }
  )
  return subscription
}

Organizing Flows with Folders

typescript
import { trpcClient } from '@badaitech/chaingraph-trpc/client'

// Create a folder
const createFolder = async () => {
  const folder = await trpcClient.folder.create.mutate({
    name: 'Data Processing',
    description: 'Flows for data processing tasks',
    parentFolderId: null, // Root-level folder
    color: '#FF5733',
  })
  return folder
}

// Create a nested folder
const createNestedFolder = async (parentFolderId) => {
  const subfolder = await trpcClient.folder.create.mutate({
    name: 'Transformations',
    description: 'Data transformation flows',
    parentFolderId,
    color: '#33FF57',
  })
  return subfolder
}

// List all folders for authenticated user
const listFolders = async () => {
  const folders = await trpcClient.folder.list.query()
  return folders
}

// List folders with flows (single request for UI rendering)
const listFoldersWithFlows = async () => {
  const result = await trpcClient.folder.listWithFlows.query()
  // result.folders: array of folders
  // result.flows: array of flows
  return result
}

// List folders and flows within a specific root folder
const listWorkspace = async (rootFolderId) => {
  const result = await trpcClient.folder.listWithFlows.query({
    rootFolderId,
  })
  return result
}

// Edit a folder
const editFolder = async (folderId) => {
  const updated = await trpcClient.folder.edit.mutate({
    folderId,
    name: 'Renamed Folder',
    description: 'Updated description',
    color: '#0099FF',
  })
  return updated
}

// Move a folder to new parent
const moveFolder = async (folderId, newParentId, newOrder) => {
  const moved = await trpcClient.folder.move.mutate({
    folderId,
    newParentFolderId: newParentId, // null for root level
    newOrder,
  })
  return moved
}

// Move a flow to a different folder
const moveFlowToFolder = async (flowId, folderId, order) => {
  const flow = await trpcClient.folder.moveFlow.mutate({
    flowId,
    newFolderId: folderId, // null for root level
    newOrder: order,
  })
  return flow
}

// Delete an empty folder
const deleteFolder = async (folderId) => {
  const success = await trpcClient.folder.delete.mutate({
    folderId,
  })
  return success
}

Executing Flows

typescript
// Create execution instance
const createExecution = async (flowId) => {
  const execution = await trpcClient.execution.create.mutate({
    flowId,
    options: {
      debug: true, // Enable debugging features
    },
  })
  return execution
}

// Start execution
const startExecution = async (executionId) => {
  await trpcClient.execution.start.mutate({ executionId })
}

// Subscribe to execution events
const subscribeToExecutionEvents = async (executionId) => {
  const subscription = trpcClient.execution.subscribeToEvents.subscribe(
    { executionId },
    {
      onData: (event) => {
        console.log('Execution event:', event)
        // Handle: ExecutionStarted, StepCompleted, Failed, Completed, etc.
      },
      onError: (err) => console.error('Subscription error:', err),
      onComplete: () => console.log('Execution finished'),
    }
  )
  return subscription
}

// Add breakpoint for debugging
const addBreakpoint = async (executionId, nodeId) => {
  await trpcClient.execution.debug.addBreakpoint.mutate({
    executionId,
    nodeId,
  })
}

// Step through execution (when paused at breakpoint)
const stepExecution = async (executionId) => {
  await trpcClient.execution.debug.step.mutate({ executionId })
}

// Pause execution
const pauseExecution = async (executionId) => {
  await trpcClient.execution.pause.mutate({ executionId })
}

// Resume execution
const resumeExecution = async (executionId) => {
  await trpcClient.execution.resume.mutate({ executionId })
}

// Stop execution
const stopExecution = async (executionId) => {
  await trpcClient.execution.stop.mutate({ executionId })
}

// Get execution state
const getExecutionState = async (executionId) => {
  const state = await trpcClient.execution.getState.query({ executionId })
  return state
}

Exploring Available Nodes

typescript
// Get all nodes categorized
const getCategorizedNodes = async () => {
  const categories = await trpcClient.nodeRegistry.getCategorizedNodes.query()
  return categories
}

// Search for nodes by name or description
const searchNodes = async (query) => {
  const results = await trpcClient.nodeRegistry.searchNodes.query({ query })
  return results
}

// Get nodes for a specific category
const getNodesByCategory = async (categoryId) => {
  const category = await trpcClient.nodeRegistry.getNodesByCategory.query({
    categoryId,
  })
  return category
}

// Get all available categories
const getCategories = async () => {
  const categories = await trpcClient.nodeRegistry.getCategories.query()
  return categories
}

// Get detailed information about a specific node type
const getNodeType = async (nodeTypeId) => {
  const nodeType = await trpcClient.nodeRegistry.getNodeType.query({
    nodeTypeId,
  })
  return nodeType
}

API Reference

Entry Points

The package exports two main entry points:

Client API (@badaitech/chaingraph-trpc/client)

typescript
import {
  trpcReact,        // React hooks (useQuery, useMutation, useSubscription)
  trpcClient,       // Non-React client
  queryClient,      // TanStack Query client
} from '@badaitech/chaingraph-trpc/client'

// Type exports
import type {
  RouterInputs,    // Input types for all procedures
  RouterOutputs,   // Output types for all procedures
  AppRouter,       // Full router type
} from '@badaitech/chaingraph-trpc/client'

Type Usage Example:

typescript
import type { RouterInputs, RouterOutputs } from '@badaitech/chaingraph-trpc/client'

type CreateFlowInput = RouterInputs['flow']['create']
type FlowOutput = RouterOutputs['flow']['create']

const handleCreateFlow = async (input: CreateFlowInput) => {
  const result: FlowOutput = await trpcClient.flow.create.mutate(input)
  return result
}

Server API (@badaitech/chaingraph-trpc/server)

typescript
import {
  appRouter,        // Main tRPC router with all procedures
  createContext,    // Context factory for tRPC requests
  init,             // Initialize backend systems
  applyWSSHandler,  // Setup WebSocket handler
} from '@badaitech/chaingraph-trpc/server'

Procedure Categories

Flow Management Procedures

  • flow.create - Create a new flow
  • flow.get - Get a flow by ID
  • flow.list - List all flows for authenticated user
  • flow.delete - Delete a flow
  • flow.addNode - Add a node to a flow
  • flow.removeNode - Remove a node from a flow
  • flow.connectPorts - Connect ports between nodes
  • flow.removeEdge - Remove an edge between nodes
  • flow.updateNodePosition - Update node canvas position
  • flow.updatePortValue - Update port value
  • flow.updatePortUI - Update port UI configuration
  • flow.subscribeToEvents - Real-time subscription to flow changes

Folder Management Procedures (NEW)

  • folder.create - Create a new folder
  • folder.list - List folders for authenticated user
  • folder.listWithFlows - List folders and flows (combined query)
  • folder.edit - Update folder metadata (name, description, color)
  • folder.move - Move folder to new parent with new ordering
  • folder.delete - Delete an empty folder
  • folder.moveFlow - Move a flow to a different folder

Folder Organization Features:

  • Hierarchical folder structure with parent-child relationships
  • Ordering support for custom folder and flow sorting
  • Color tagging for visual organization
  • Root-level workspace isolation with rootFolderId
  • Cascade delete for folder hierarchies
  • Efficient combined queries for UI rendering

Execution Control Procedures

  • execution.create - Create an execution instance
  • execution.start - Start execution
  • execution.stop - Stop execution
  • execution.pause - Pause execution (at next step)
  • execution.resume - Resume execution
  • execution.getState - Get current execution state
  • execution.subscribeToEvents - Real-time subscription to execution events
  • execution.debug.addBreakpoint - Add breakpoint at node
  • execution.debug.removeBreakpoint - Remove breakpoint
  • execution.debug.step - Step to next node (when paused)
  • execution.debug.getBreakpoints - Get all active breakpoints

Node Registry Procedures

  • nodeRegistry.getCategorizedNodes - Get all nodes grouped by categories
  • nodeRegistry.searchNodes - Search nodes by query
  • nodeRegistry.getNodesByCategory - Get nodes for specific category
  • nodeRegistry.getCategories - Get all available categories
  • nodeRegistry.getNodeType - Get detailed node type information

Secret Management Procedures

  • secrets.create - Create a secret value
  • secrets.get - Retrieve a secret
  • secrets.delete - Delete a secret

MCP (Model Context Protocol) Procedures

  • mcp.* - Model Context Protocol tool integration

Advanced Topics

Real-Time Subscriptions

Subscriptions maintain WebSocket connections and emit data as it becomes available:

typescript
const subscribeToFlowEvents = async (flowId) => {
  let subscription: Unsubscribable | null = null

  try {
    subscription = trpcClient.flow.subscribeToEvents.subscribe(
      { flowId },
      {
        onData: (event) => {
          // Handle different event types
          if (event.type === 'nodeAdded') {
            console.log('Node added:', event.node)
          } else if (event.type === 'edgeAdded') {
            console.log('Edge created:', event.edge)
          } else if (event.type === 'portUpdated') {
            console.log('Port value changed:', event.port)
          }
        },
        onError: (error) => {
          console.error('Subscription error:', error)
          // Implement reconnection logic here
        },
        onComplete: () => {
          console.log('Subscription ended')
        },
      }
    )
  } catch (error) {
    console.error('Failed to subscribe:', error)
  }

  // Cleanup: unsubscribe when done
  return () => {
    subscription?.unsubscribe()
  }
}

Error Handling

tRPC provides structured error handling:

typescript
import { TRPCClientError } from '@trpc/client'
import type { AppRouter } from '@badaitech/chaingraph-trpc/client'

const handleFlowCreation = async () => {
  try {
    const flow = await trpcClient.flow.create.mutate({
      name: 'My Flow',
      description: 'Test flow',
    })
    console.log('Flow created:', flow)
  } catch (error) {
    if (error instanceof TRPCClientError<AppRouter>) {
      // Handle tRPC-specific errors
      console.error('tRPC error:', error.message)
      console.error('Code:', error.data?.code)
      console.error('Stack:', error.data?.stack)

      if (error.data?.code === 'UNAUTHORIZED') {
        // Handle authentication errors
        redirectToLogin()
      } else if (error.data?.code === 'FORBIDDEN') {
        // Handle permission errors
        showAccessDenied()
      } else if (error.data?.code === 'BAD_REQUEST') {
        // Handle validation errors
        showValidationErrors(error.message)
      }
    } else {
      // Handle network or other errors
      console.error('Unknown error:', error)
    }
  }
}

Testing tRPC Procedures

typescript
import { describe, it, expect, beforeAll } from 'vitest'
import { createCaller } from '@badaitech/chaingraph-trpc/server'
import { createTestContext } from '@badaitech/chaingraph-trpc/server/test/utils/createTestContext'

describe('Flow Procedures', () => {
  let caller: ReturnType<typeof createCaller>
  let testContext: ReturnType<typeof createTestContext>

  beforeAll(async () => {
    testContext = await createTestContext()
    caller = createCaller(testContext)
  })

  it('should create a flow', async () => {
    const flow = await caller.flow.create({
      name: 'Test Flow',
      description: 'A test flow',
    })

    expect(flow).toBeDefined()
    expect(flow.metadata.name).toBe('Test Flow')
  })

  it('should add node to flow', async () => {
    const flow = await caller.flow.create({
      name: 'Node Test Flow',
    })

    const node = await caller.flow.addNode({
      flowId: flow.metadata.id,
      nodeType: 'string-node',
      position: { x: 0, y: 0 },
    })

    expect(node).toBeDefined()
    expect(node.id).toBeDefined()
  })

  it('should create folder hierarchy', async () => {
    const rootFolder = await caller.folder.create({
      name: 'Root',
      parentFolderId: null,
    })

    const childFolder = await caller.folder.create({
      name: 'Child',
      parentFolderId: rootFolder.id,
    })

    expect(childFolder.parentId).toBe(rootFolder.id)
  })
})

Optimistic Updates with React

typescript
import { useQueryClient } from '@tanstack/react-query'
import { trpcReact } from '@badaitech/chaingraph-trpc/client'

function FlowComponent() {
  const queryClient = useQueryClient()
  const utils = trpcReact.useUtils()

  const createFlowMutation = trpcReact.flow.create.useMutation({
    onMutate: async (newFlow) => {
      // Cancel outgoing refetches
      await queryClient.cancelQueries({ queryKey: [['flow', 'list']] })

      // Get previous data
      const previousFlows = queryClient.getQueryData([['flow', 'list']])

      // Optimistically update cache
      queryClient.setQueryData([['flow', 'list']], (old: any) => [
        ...old,
        { metadata: { id: 'temp', ...newFlow }, nodes: [], edges: [] },
      ])

      return { previousFlows }
    },
    onError: (err, newFlow, context) => {
      // Rollback on error
      if (context?.previousFlows) {
        queryClient.setQueryData([['flow', 'list']], context.previousFlows)
      }
    },
    onSuccess: () => {
      // Refetch and invalidate
      utils.flow.list.invalidate()
    },
  })

  return (
    <button
      onClick={() =>
        createFlowMutation.mutate({
          name: 'New Flow',
          description: 'Created optimistically',
        })
      }
    >
      Create Flow
    </button>
  )
}

Database Support

ChainGraph tRPC supports two storage mechanisms:

  1. In-Memory Store: Default storage option, suitable for development
  2. PostgreSQL Store: Persistent storage for production environments

Setting Up PostgreSQL

Set the DATABASE_URL environment variable:

bash
DATABASE_URL=postgres://username:password@localhost:5432/dbname

Running Migrations

bash
# Generate migration files (when schema.ts changes)
pnpm run migrate:generate

# Push migrations to database
pnpm run migrate:push

# Run migrations programmatically
pnpm run migrate:run

Recent Migration: Folder Support (0007)

The 0007 migration adds:

  • chaingraph_flow_folders table with:

    • Hierarchical parent-child relationships
    • Owner-based isolation
    • Ordering support for custom sorting
    • Color tagging
    • Cascade delete on parent removal
  • Updates to chaingraph_flows table:

    • folder_id column for folder association
    • order column for custom flow sorting
    • Foreign key constraints with ON DELETE SET NULL
  • Indexes for performance:

    • Owner-based queries
    • Parent-based queries
    • Order-based queries

Configuration

Environment Variables

bash
# Database connection
DATABASE_URL=postgres://username:password@localhost:5432/dbname

# WebSocket server
WS_PORT=3001
WS_HOST=0.0.0.0

# Authentication
AUTH_SECRET=your-secret-key

# Node registry
NODE_REGISTRY_PATH=/path/to/nodes

Drizzle ORM Configuration

The package uses Drizzle ORM for database migrations. Configuration is in server/drizzle.config.ts.

Folder Organization Best Practices

  1. Use Root-Level Organization: Create top-level folders for different project areas or teams
  2. Namespace Isolation: Use rootFolderId parameter to isolate content for different workspaces
  3. Consistent Ordering: Maintain order values for predictable UI sorting
  4. Color Coding: Use folder colors to visually distinguish project types or stages
  5. Clean Hierarchy: Keep folder depth reasonable (avoid deeply nested structures)

License

BUSL-1.1 - Business Source License

  • @badaitech/chaingraph-types: Core type definitions and decorators
  • @badaitech/chaingraph-nodes: Collection of pre-built nodes
  • @badaitech/chaingraph-frontend: React UI components for visual flow programming
  • @badaitech/chaingraph-executor: Backend execution engine for distributed flow execution

Support

For issues, questions, or contributions, please visit the main ChainGraph repository.

Modules

Licensed under BUSL-1.1