ChainGraph API Documentation / @badaitech/chaingraph-trpc
@badaitech/chaingraph-trpc
A type-safe tRPC layer for the ChainGraph flow-based programming framework. This package provides end-to-end type safety between client and server, real-time subscriptions via WebSockets, and a robust API for flow management, folder organization, node registration, and execution control.
Overview
@badaitech/chaingraph-trpc serves as the communication backbone of ChainGraph, enabling:
- Type-Safe APIs: Complete end-to-end type safety using tRPC and SuperJSON
- Real-Time Updates: WebSocket-based subscriptions for flow and execution events
- Flow Management: Create, retrieve, update, and delete flows with folder organization
- Folder Hierarchy: Organize flows into hierarchical folders with move and ordering operations
- Node Operations: Register, discover, and instantiate computational nodes
- Execution Control: Start, stop, pause, and monitor flow executions
- Storage Integration: Support for both in-memory and PostgreSQL persistence
- Debugging Tools: Breakpoints, stepping, and execution monitoring
Installation
# Before installing, make sure you have set up authentication for GitHub Packages
npm install @badaitech/chaingraph-trpc
# or
yarn add @badaitech/chaingraph-trpc
# or
pnpm add @badaitech/chaingraph-trpcAuthentication for GitHub Packages
To use this package, you need to configure npm to authenticate with GitHub Packages:
- Create a personal access token (PAT) with the
read:packagesscope on GitHub. - Add the following to your project's
.npmrcfile or to your global~/.npmrcfile:
@badaitech:registry=https://npm.pkg.github.com
//npm.pkg.github.com/:_authToken=YOUR_GITHUB_PATReplace YOUR_GITHUB_PAT with your actual GitHub personal access token.
Quick Start
Client Setup
import { trpcClient, trpcReact, queryClient } from '@badaitech/chaingraph-trpc/client'
import { QueryClientProvider } from '@tanstack/react-query'
import { useState } from 'react'
function App() {
// Set up WebSocket connection
const [trpc] = useState(() => trpcReact.createClient({
links: [
wsLink({
client: createWSClient({
url: 'ws://localhost:3001',
}),
}),
],
}))
return (
<trpcReact.Provider client={trpc} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>
<YourApplication />
</QueryClientProvider>
</trpcReact.Provider>
)
}Server Setup
import { init, applyWSSHandler, appRouter, createContext } from '@badaitech/chaingraph-trpc/server'
import { WebSocketServer } from 'ws'
// Initialize tRPC context and stores
await init()
// Create WebSocket server
const wss = new WebSocketServer({ port: 3001 })
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
})
console.log('WebSocket Server listening on ws://localhost:3001')Database Setup
Run migrations to set up the database schema:
# Generate migration files
pnpm run migrate:generate
# Push migrations to PostgreSQL
pnpm run migrate:push
# Run migrations programmatically
pnpm run migrate:runSet the DATABASE_URL environment variable:
DATABASE_URL=postgres://username:password@localhost:5432/dbnameThe latest migration (0007) adds folder hierarchy support with the chaingraph_flow_folders table.
Usage Patterns
Working with Flows
import { trpcClient } from '@badaitech/chaingraph-trpc/client'
// Create a new flow
const createFlow = async () => {
const flow = await trpcClient.flow.create.mutate({
name: 'My Flow',
description: 'A flow for processing data',
tags: ['demo', 'processing'],
})
return flow
}
// Add a node to a flow
const addNode = async (flowId, nodeType) => {
const node = await trpcClient.flow.addNode.mutate({
flowId,
nodeType,
position: { x: 100, y: 100 },
})
return node
}
// Connect node ports
const connectPorts = async (flowId, sourceNodeId, targetNodeId) => {
const edge = await trpcClient.flow.connectPorts.mutate({
flowId,
sourceNodeId,
sourcePortId: 'output',
targetNodeId,
targetPortId: 'input',
})
return edge
}
// Update port value
const updatePortValue = async (flowId, nodeId, portId, value) => {
await trpcClient.flow.updatePortValue.mutate({
flowId,
nodeId,
portId,
value,
})
}
// Subscribe to flow events
const subscribeToFlowEvents = async (flowId) => {
const subscription = trpcClient.flow.subscribeToEvents.subscribe(
{ flowId },
{
onData: (event) => console.log('Flow event:', event),
onError: (err) => console.error('Subscription error:', err),
onComplete: () => console.log('Subscription complete'),
}
)
return subscription
}Organizing Flows with Folders
import { trpcClient } from '@badaitech/chaingraph-trpc/client'
// Create a folder
const createFolder = async () => {
const folder = await trpcClient.folder.create.mutate({
name: 'Data Processing',
description: 'Flows for data processing tasks',
parentFolderId: null, // Root-level folder
color: '#FF5733',
})
return folder
}
// Create a nested folder
const createNestedFolder = async (parentFolderId) => {
const subfolder = await trpcClient.folder.create.mutate({
name: 'Transformations',
description: 'Data transformation flows',
parentFolderId,
color: '#33FF57',
})
return subfolder
}
// List all folders for authenticated user
const listFolders = async () => {
const folders = await trpcClient.folder.list.query()
return folders
}
// List folders with flows (single request for UI rendering)
const listFoldersWithFlows = async () => {
const result = await trpcClient.folder.listWithFlows.query()
// result.folders: array of folders
// result.flows: array of flows
return result
}
// List folders and flows within a specific root folder
const listWorkspace = async (rootFolderId) => {
const result = await trpcClient.folder.listWithFlows.query({
rootFolderId,
})
return result
}
// Edit a folder
const editFolder = async (folderId) => {
const updated = await trpcClient.folder.edit.mutate({
folderId,
name: 'Renamed Folder',
description: 'Updated description',
color: '#0099FF',
})
return updated
}
// Move a folder to new parent
const moveFolder = async (folderId, newParentId, newOrder) => {
const moved = await trpcClient.folder.move.mutate({
folderId,
newParentFolderId: newParentId, // null for root level
newOrder,
})
return moved
}
// Move a flow to a different folder
const moveFlowToFolder = async (flowId, folderId, order) => {
const flow = await trpcClient.folder.moveFlow.mutate({
flowId,
newFolderId: folderId, // null for root level
newOrder: order,
})
return flow
}
// Delete an empty folder
const deleteFolder = async (folderId) => {
const success = await trpcClient.folder.delete.mutate({
folderId,
})
return success
}Executing Flows
// Create execution instance
const createExecution = async (flowId) => {
const execution = await trpcClient.execution.create.mutate({
flowId,
options: {
debug: true, // Enable debugging features
},
})
return execution
}
// Start execution
const startExecution = async (executionId) => {
await trpcClient.execution.start.mutate({ executionId })
}
// Subscribe to execution events
const subscribeToExecutionEvents = async (executionId) => {
const subscription = trpcClient.execution.subscribeToEvents.subscribe(
{ executionId },
{
onData: (event) => {
console.log('Execution event:', event)
// Handle: ExecutionStarted, StepCompleted, Failed, Completed, etc.
},
onError: (err) => console.error('Subscription error:', err),
onComplete: () => console.log('Execution finished'),
}
)
return subscription
}
// Add breakpoint for debugging
const addBreakpoint = async (executionId, nodeId) => {
await trpcClient.execution.debug.addBreakpoint.mutate({
executionId,
nodeId,
})
}
// Step through execution (when paused at breakpoint)
const stepExecution = async (executionId) => {
await trpcClient.execution.debug.step.mutate({ executionId })
}
// Pause execution
const pauseExecution = async (executionId) => {
await trpcClient.execution.pause.mutate({ executionId })
}
// Resume execution
const resumeExecution = async (executionId) => {
await trpcClient.execution.resume.mutate({ executionId })
}
// Stop execution
const stopExecution = async (executionId) => {
await trpcClient.execution.stop.mutate({ executionId })
}
// Get execution state
const getExecutionState = async (executionId) => {
const state = await trpcClient.execution.getState.query({ executionId })
return state
}Exploring Available Nodes
// Get all nodes categorized
const getCategorizedNodes = async () => {
const categories = await trpcClient.nodeRegistry.getCategorizedNodes.query()
return categories
}
// Search for nodes by name or description
const searchNodes = async (query) => {
const results = await trpcClient.nodeRegistry.searchNodes.query({ query })
return results
}
// Get nodes for a specific category
const getNodesByCategory = async (categoryId) => {
const category = await trpcClient.nodeRegistry.getNodesByCategory.query({
categoryId,
})
return category
}
// Get all available categories
const getCategories = async () => {
const categories = await trpcClient.nodeRegistry.getCategories.query()
return categories
}
// Get detailed information about a specific node type
const getNodeType = async (nodeTypeId) => {
const nodeType = await trpcClient.nodeRegistry.getNodeType.query({
nodeTypeId,
})
return nodeType
}API Reference
Entry Points
The package exports two main entry points:
Client API (@badaitech/chaingraph-trpc/client)
import {
trpcReact, // React hooks (useQuery, useMutation, useSubscription)
trpcClient, // Non-React client
queryClient, // TanStack Query client
} from '@badaitech/chaingraph-trpc/client'
// Type exports
import type {
RouterInputs, // Input types for all procedures
RouterOutputs, // Output types for all procedures
AppRouter, // Full router type
} from '@badaitech/chaingraph-trpc/client'Type Usage Example:
import type { RouterInputs, RouterOutputs } from '@badaitech/chaingraph-trpc/client'
type CreateFlowInput = RouterInputs['flow']['create']
type FlowOutput = RouterOutputs['flow']['create']
const handleCreateFlow = async (input: CreateFlowInput) => {
const result: FlowOutput = await trpcClient.flow.create.mutate(input)
return result
}Server API (@badaitech/chaingraph-trpc/server)
import {
appRouter, // Main tRPC router with all procedures
createContext, // Context factory for tRPC requests
init, // Initialize backend systems
applyWSSHandler, // Setup WebSocket handler
} from '@badaitech/chaingraph-trpc/server'Procedure Categories
Flow Management Procedures
flow.create- Create a new flowflow.get- Get a flow by IDflow.list- List all flows for authenticated userflow.delete- Delete a flowflow.addNode- Add a node to a flowflow.removeNode- Remove a node from a flowflow.connectPorts- Connect ports between nodesflow.removeEdge- Remove an edge between nodesflow.updateNodePosition- Update node canvas positionflow.updatePortValue- Update port valueflow.updatePortUI- Update port UI configurationflow.subscribeToEvents- Real-time subscription to flow changes
Folder Management Procedures (NEW)
folder.create- Create a new folderfolder.list- List folders for authenticated userfolder.listWithFlows- List folders and flows (combined query)folder.edit- Update folder metadata (name, description, color)folder.move- Move folder to new parent with new orderingfolder.delete- Delete an empty folderfolder.moveFlow- Move a flow to a different folder
Folder Organization Features:
- Hierarchical folder structure with parent-child relationships
- Ordering support for custom folder and flow sorting
- Color tagging for visual organization
- Root-level workspace isolation with
rootFolderId - Cascade delete for folder hierarchies
- Efficient combined queries for UI rendering
Execution Control Procedures
execution.create- Create an execution instanceexecution.start- Start executionexecution.stop- Stop executionexecution.pause- Pause execution (at next step)execution.resume- Resume executionexecution.getState- Get current execution stateexecution.subscribeToEvents- Real-time subscription to execution eventsexecution.debug.addBreakpoint- Add breakpoint at nodeexecution.debug.removeBreakpoint- Remove breakpointexecution.debug.step- Step to next node (when paused)execution.debug.getBreakpoints- Get all active breakpoints
Node Registry Procedures
nodeRegistry.getCategorizedNodes- Get all nodes grouped by categoriesnodeRegistry.searchNodes- Search nodes by querynodeRegistry.getNodesByCategory- Get nodes for specific categorynodeRegistry.getCategories- Get all available categoriesnodeRegistry.getNodeType- Get detailed node type information
Secret Management Procedures
secrets.create- Create a secret valuesecrets.get- Retrieve a secretsecrets.delete- Delete a secret
MCP (Model Context Protocol) Procedures
mcp.*- Model Context Protocol tool integration
Advanced Topics
Real-Time Subscriptions
Subscriptions maintain WebSocket connections and emit data as it becomes available:
const subscribeToFlowEvents = async (flowId) => {
let subscription: Unsubscribable | null = null
try {
subscription = trpcClient.flow.subscribeToEvents.subscribe(
{ flowId },
{
onData: (event) => {
// Handle different event types
if (event.type === 'nodeAdded') {
console.log('Node added:', event.node)
} else if (event.type === 'edgeAdded') {
console.log('Edge created:', event.edge)
} else if (event.type === 'portUpdated') {
console.log('Port value changed:', event.port)
}
},
onError: (error) => {
console.error('Subscription error:', error)
// Implement reconnection logic here
},
onComplete: () => {
console.log('Subscription ended')
},
}
)
} catch (error) {
console.error('Failed to subscribe:', error)
}
// Cleanup: unsubscribe when done
return () => {
subscription?.unsubscribe()
}
}Error Handling
tRPC provides structured error handling:
import { TRPCClientError } from '@trpc/client'
import type { AppRouter } from '@badaitech/chaingraph-trpc/client'
const handleFlowCreation = async () => {
try {
const flow = await trpcClient.flow.create.mutate({
name: 'My Flow',
description: 'Test flow',
})
console.log('Flow created:', flow)
} catch (error) {
if (error instanceof TRPCClientError<AppRouter>) {
// Handle tRPC-specific errors
console.error('tRPC error:', error.message)
console.error('Code:', error.data?.code)
console.error('Stack:', error.data?.stack)
if (error.data?.code === 'UNAUTHORIZED') {
// Handle authentication errors
redirectToLogin()
} else if (error.data?.code === 'FORBIDDEN') {
// Handle permission errors
showAccessDenied()
} else if (error.data?.code === 'BAD_REQUEST') {
// Handle validation errors
showValidationErrors(error.message)
}
} else {
// Handle network or other errors
console.error('Unknown error:', error)
}
}
}Testing tRPC Procedures
import { describe, it, expect, beforeAll } from 'vitest'
import { createCaller } from '@badaitech/chaingraph-trpc/server'
import { createTestContext } from '@badaitech/chaingraph-trpc/server/test/utils/createTestContext'
describe('Flow Procedures', () => {
let caller: ReturnType<typeof createCaller>
let testContext: ReturnType<typeof createTestContext>
beforeAll(async () => {
testContext = await createTestContext()
caller = createCaller(testContext)
})
it('should create a flow', async () => {
const flow = await caller.flow.create({
name: 'Test Flow',
description: 'A test flow',
})
expect(flow).toBeDefined()
expect(flow.metadata.name).toBe('Test Flow')
})
it('should add node to flow', async () => {
const flow = await caller.flow.create({
name: 'Node Test Flow',
})
const node = await caller.flow.addNode({
flowId: flow.metadata.id,
nodeType: 'string-node',
position: { x: 0, y: 0 },
})
expect(node).toBeDefined()
expect(node.id).toBeDefined()
})
it('should create folder hierarchy', async () => {
const rootFolder = await caller.folder.create({
name: 'Root',
parentFolderId: null,
})
const childFolder = await caller.folder.create({
name: 'Child',
parentFolderId: rootFolder.id,
})
expect(childFolder.parentId).toBe(rootFolder.id)
})
})Optimistic Updates with React
import { useQueryClient } from '@tanstack/react-query'
import { trpcReact } from '@badaitech/chaingraph-trpc/client'
function FlowComponent() {
const queryClient = useQueryClient()
const utils = trpcReact.useUtils()
const createFlowMutation = trpcReact.flow.create.useMutation({
onMutate: async (newFlow) => {
// Cancel outgoing refetches
await queryClient.cancelQueries({ queryKey: [['flow', 'list']] })
// Get previous data
const previousFlows = queryClient.getQueryData([['flow', 'list']])
// Optimistically update cache
queryClient.setQueryData([['flow', 'list']], (old: any) => [
...old,
{ metadata: { id: 'temp', ...newFlow }, nodes: [], edges: [] },
])
return { previousFlows }
},
onError: (err, newFlow, context) => {
// Rollback on error
if (context?.previousFlows) {
queryClient.setQueryData([['flow', 'list']], context.previousFlows)
}
},
onSuccess: () => {
// Refetch and invalidate
utils.flow.list.invalidate()
},
})
return (
<button
onClick={() =>
createFlowMutation.mutate({
name: 'New Flow',
description: 'Created optimistically',
})
}
>
Create Flow
</button>
)
}Database Support
ChainGraph tRPC supports two storage mechanisms:
- In-Memory Store: Default storage option, suitable for development
- PostgreSQL Store: Persistent storage for production environments
Setting Up PostgreSQL
Set the DATABASE_URL environment variable:
DATABASE_URL=postgres://username:password@localhost:5432/dbnameRunning Migrations
# Generate migration files (when schema.ts changes)
pnpm run migrate:generate
# Push migrations to database
pnpm run migrate:push
# Run migrations programmatically
pnpm run migrate:runRecent Migration: Folder Support (0007)
The 0007 migration adds:
chaingraph_flow_folderstable with:- Hierarchical parent-child relationships
- Owner-based isolation
- Ordering support for custom sorting
- Color tagging
- Cascade delete on parent removal
Updates to
chaingraph_flowstable:folder_idcolumn for folder associationordercolumn for custom flow sorting- Foreign key constraints with
ON DELETE SET NULL
Indexes for performance:
- Owner-based queries
- Parent-based queries
- Order-based queries
Configuration
Environment Variables
# Database connection
DATABASE_URL=postgres://username:password@localhost:5432/dbname
# WebSocket server
WS_PORT=3001
WS_HOST=0.0.0.0
# Authentication
AUTH_SECRET=your-secret-key
# Node registry
NODE_REGISTRY_PATH=/path/to/nodesDrizzle ORM Configuration
The package uses Drizzle ORM for database migrations. Configuration is in server/drizzle.config.ts.
Folder Organization Best Practices
- Use Root-Level Organization: Create top-level folders for different project areas or teams
- Namespace Isolation: Use
rootFolderIdparameter to isolate content for different workspaces - Consistent Ordering: Maintain
ordervalues for predictable UI sorting - Color Coding: Use folder colors to visually distinguish project types or stages
- Clean Hierarchy: Keep folder depth reasonable (avoid deeply nested structures)
License
BUSL-1.1 - Business Source License
Related Packages
- @badaitech/chaingraph-types: Core type definitions and decorators
- @badaitech/chaingraph-nodes: Collection of pre-built nodes
- @badaitech/chaingraph-frontend: React UI components for visual flow programming
- @badaitech/chaingraph-executor: Backend execution engine for distributed flow execution
Support
For issues, questions, or contributions, please visit the main ChainGraph repository.