ChainGraph API Documentation / @badaitech/chaingraph-executor / server / PostgresExecutionStore
Class: PostgresExecutionStore
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:16
Implements
Constructors
Constructor
new PostgresExecutionStore(
db):PostgresExecutionStore
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:17
Parameters
db
NodePgDatabase<Record<string, unknown>> & object
Returns
PostgresExecutionStore
Methods
claimExecution()
claimExecution(
executionId,workerId,timeoutMs):Promise<boolean>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:241
Claim an execution for a worker atomically Uses PostgreSQL row-level locking to ensure only one worker can claim an execution Also updates the execution's processing fields in the same transaction
Parameters
executionId
string
workerId
string
timeoutMs
number
Returns
Promise<boolean>
Implementation of
IExecutionStore.claimExecution
create()
create(
row):Promise<void>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:21
Parameters
row
completedAt
Date | null
createdAt
Date
errorMessage
string | null
errorNodeId
string | null
executionDepth
number
externalEvents
object[] | null
failureCount
number
flowId
string
id
string
integration
object & object | null
lastFailureAt
Date | null
lastFailureReason
string | null
options
{ breakpoints?: string[]; debug?: boolean; execution?: { flowTimeoutMs?: number; maxConcurrency?: number; nodeTimeoutMs?: number; }; } | null
ownerId
string
parentExecutionId
string | null
processingStartedAt
Date | null
processingWorkerId
string | null
rootExecutionId
string | null
startedAt
Date | null
status
updatedAt
Date
Returns
Promise<void>
Implementation of
delete()
delete(
id):Promise<boolean>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:51
Parameters
id
string
Returns
Promise<boolean>
Implementation of
expireOldClaims()
expireOldClaims():
Promise<number>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:444
Expire old claims that have passed their expiration time
Returns
Promise<number>
Implementation of
IExecutionStore.expireOldClaims
extendClaim()
extendClaim(
executionId,workerId,timeoutMs):Promise<boolean>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:376
Extend an execution claim (heartbeat)
Parameters
executionId
string
workerId
string
timeoutMs
number
Returns
Promise<boolean>
Implementation of
get()
get(
id):Promise<{completedAt:Date|null;createdAt:Date;errorMessage:string|null;errorNodeId:string|null;executionDepth:number;externalEvents:object[] |null;failureCount:number;flowId:string;id:string;integration:object&object|null;lastFailureAt:Date|null;lastFailureReason:string|null;options: {breakpoints?:string[];debug?:boolean;execution?: {flowTimeoutMs?:number;maxConcurrency?:number;nodeTimeoutMs?:number; }; } |null;ownerId:string;parentExecutionId:string|null;processingStartedAt:Date|null;processingWorkerId:string|null;rootExecutionId:string|null;startedAt:Date|null;status:ExecutionStatus;updatedAt:Date; } |null>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:37
Parameters
id
string
Returns
Promise<{ completedAt: Date | null; createdAt: Date; errorMessage: string | null; errorNodeId: string | null; executionDepth: number; externalEvents: object[] | null; failureCount: number; flowId: string; id: string; integration: object & object | null; lastFailureAt: Date | null; lastFailureReason: string | null; options: { breakpoints?: string[]; debug?: boolean; execution?: { flowTimeoutMs?: number; maxConcurrency?: number; nodeTimeoutMs?: number; }; } | null; ownerId: string; parentExecutionId: string | null; processingStartedAt: Date | null; processingWorkerId: string | null; rootExecutionId: string | null; startedAt: Date | null; status: ExecutionStatus; updatedAt: Date; } | null>
Implementation of
getActiveClaims()
getActiveClaims():
Promise<ExecutionClaim[]>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:400
Get all active claims
Returns
Promise<ExecutionClaim[]>
Implementation of
IExecutionStore.getActiveClaims
getChildExecutions()
getChildExecutions(
parentId):Promise<object[]>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:167
Get all child executions of a parent
Parameters
parentId
string
Returns
Promise<object[]>
Implementation of
IExecutionStore.getChildExecutions
getClaimForExecution()
getClaimForExecution(
executionId):Promise<ExecutionClaim|null>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:419
Get claim for a specific execution
Parameters
executionId
string
Returns
Promise<ExecutionClaim | null>
Implementation of
IExecutionStore.getClaimForExecution
getExecutionsNeedingRecovery()
getExecutionsNeedingRecovery(
limit):Promise<object[]>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:490
Get executions that need recovery Finds executions that are:
- Status = 'created' but no active claim
- Status = 'running' but claim expired
- Status = 'created' with failureCount < 5 and lastFailureAt > 1 minute ago
Parameters
limit
number = 100
Returns
Promise<object[]>
Implementation of
IExecutionStore.getExecutionsNeedingRecovery
getExecutionTree()
getExecutionTree(
rootId):Promise<ExecutionTreeNode[]>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:177
Get the entire execution tree
Parameters
rootId
string
Returns
Promise<ExecutionTreeNode[]>
Implementation of
IExecutionStore.getExecutionTree
getRecoveryHistory()
getRecoveryHistory(
executionId):Promise<object[]>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:558
Get recovery history for an execution
Parameters
executionId
string
Returns
Promise<object[]>
Implementation of
IExecutionStore.getRecoveryHistory
getRootExecutions()
getRootExecutions(
flowId,limit,after):Promise<RootExecution[]>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:121
Parameters
flowId
string
limit
number = 50
after
Date | null
Returns
Promise<RootExecution[]>
Implementation of
IExecutionStore.getRootExecutions
recordRecovery()
recordRecovery(
executionId,workerId,reason,previousStatus?,previousWorkerId?):Promise<void>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:538
Record a recovery action for an execution
Parameters
executionId
string
workerId
string
reason
string
previousStatus?
string
previousWorkerId?
string
Returns
Promise<void>
Implementation of
IExecutionStore.recordRecovery
releaseExecution()
releaseExecution(
executionId,workerId):Promise<void>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:342
Release an execution claim Also clears the processing fields in the execution row
Parameters
executionId
string
workerId
string
Returns
Promise<void>
Implementation of
IExecutionStore.releaseExecution
releaseRecoveryLock()
releaseRecoveryLock(
lockId):Promise<void>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:586
Release a PostgreSQL advisory lock
Parameters
lockId
number
Returns
Promise<void>
Implementation of
IExecutionStore.releaseRecoveryLock
tryAcquireRecoveryLock()
tryAcquireRecoveryLock(
lockId):Promise<boolean>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:570
Try to acquire a PostgreSQL advisory lock Returns true if lock was acquired, false if another session holds it
Parameters
lockId
number
Returns
Promise<boolean>
Implementation of
IExecutionStore.tryAcquireRecoveryLock
updateExecutionStatus()
updateExecutionStatus(
params):Promise<boolean>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:64
Atomically update execution status and related fields This method avoids the need to fetch the execution first
Parameters
params
UpdateExecutionStatusParams
Returns
Promise<boolean>
Implementation of
IExecutionStore.updateExecutionStatus
updateFailureInfo()
updateFailureInfo(
executionId,failureCount,reason):Promise<boolean>
Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:464
Update failure count and reason for an execution
Parameters
executionId
string
failureCount
number
reason
string
Returns
Promise<boolean>