Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-executor / server / PostgresExecutionStore

Class: PostgresExecutionStore

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:16

Implements

Constructors

Constructor

new PostgresExecutionStore(db): PostgresExecutionStore

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:17

Parameters

db

NodePgDatabase<Record<string, unknown>> & object

Returns

PostgresExecutionStore

Methods

claimExecution()

claimExecution(executionId, workerId, timeoutMs): Promise<boolean>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:241

Claim an execution for a worker atomically Uses PostgreSQL row-level locking to ensure only one worker can claim an execution Also updates the execution's processing fields in the same transaction

Parameters

executionId

string

workerId

string

timeoutMs

number

Returns

Promise<boolean>

Implementation of

IExecutionStore.claimExecution


create()

create(row): Promise<void>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:21

Parameters

row
completedAt

Date | null

createdAt

Date

errorMessage

string | null

errorNodeId

string | null

executionDepth

number

externalEvents

object[] | null

failureCount

number

flowId

string

id

string

integration

object & object | null

lastFailureAt

Date | null

lastFailureReason

string | null

options

{ breakpoints?: string[]; debug?: boolean; execution?: { flowTimeoutMs?: number; maxConcurrency?: number; nodeTimeoutMs?: number; }; } | null

ownerId

string

parentExecutionId

string | null

processingStartedAt

Date | null

processingWorkerId

string | null

rootExecutionId

string | null

startedAt

Date | null

status

ExecutionStatus

updatedAt

Date

Returns

Promise<void>

Implementation of

IExecutionStore.create


delete()

delete(id): Promise<boolean>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:51

Parameters

id

string

Returns

Promise<boolean>

Implementation of

IExecutionStore.delete


expireOldClaims()

expireOldClaims(): Promise<number>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:444

Expire old claims that have passed their expiration time

Returns

Promise<number>

Implementation of

IExecutionStore.expireOldClaims


extendClaim()

extendClaim(executionId, workerId, timeoutMs): Promise<boolean>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:376

Extend an execution claim (heartbeat)

Parameters

executionId

string

workerId

string

timeoutMs

number

Returns

Promise<boolean>

Implementation of

IExecutionStore.extendClaim


get()

get(id): Promise<{ completedAt: Date | null; createdAt: Date; errorMessage: string | null; errorNodeId: string | null; executionDepth: number; externalEvents: object[] | null; failureCount: number; flowId: string; id: string; integration: object & object | null; lastFailureAt: Date | null; lastFailureReason: string | null; options: { breakpoints?: string[]; debug?: boolean; execution?: { flowTimeoutMs?: number; maxConcurrency?: number; nodeTimeoutMs?: number; }; } | null; ownerId: string; parentExecutionId: string | null; processingStartedAt: Date | null; processingWorkerId: string | null; rootExecutionId: string | null; startedAt: Date | null; status: ExecutionStatus; updatedAt: Date; } | null>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:37

Parameters

id

string

Returns

Promise<{ completedAt: Date | null; createdAt: Date; errorMessage: string | null; errorNodeId: string | null; executionDepth: number; externalEvents: object[] | null; failureCount: number; flowId: string; id: string; integration: object & object | null; lastFailureAt: Date | null; lastFailureReason: string | null; options: { breakpoints?: string[]; debug?: boolean; execution?: { flowTimeoutMs?: number; maxConcurrency?: number; nodeTimeoutMs?: number; }; } | null; ownerId: string; parentExecutionId: string | null; processingStartedAt: Date | null; processingWorkerId: string | null; rootExecutionId: string | null; startedAt: Date | null; status: ExecutionStatus; updatedAt: Date; } | null>

Implementation of

IExecutionStore.get


getActiveClaims()

getActiveClaims(): Promise<ExecutionClaim[]>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:400

Get all active claims

Returns

Promise<ExecutionClaim[]>

Implementation of

IExecutionStore.getActiveClaims


getChildExecutions()

getChildExecutions(parentId): Promise<object[]>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:167

Get all child executions of a parent

Parameters

parentId

string

Returns

Promise<object[]>

Implementation of

IExecutionStore.getChildExecutions


getClaimForExecution()

getClaimForExecution(executionId): Promise<ExecutionClaim | null>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:419

Get claim for a specific execution

Parameters

executionId

string

Returns

Promise<ExecutionClaim | null>

Implementation of

IExecutionStore.getClaimForExecution


getExecutionsNeedingRecovery()

getExecutionsNeedingRecovery(limit): Promise<object[]>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:490

Get executions that need recovery Finds executions that are:

  1. Status = 'created' but no active claim
  2. Status = 'running' but claim expired
  3. Status = 'created' with failureCount < 5 and lastFailureAt > 1 minute ago

Parameters

limit

number = 100

Returns

Promise<object[]>

Implementation of

IExecutionStore.getExecutionsNeedingRecovery


getExecutionTree()

getExecutionTree(rootId): Promise<ExecutionTreeNode[]>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:177

Get the entire execution tree

Parameters

rootId

string

Returns

Promise<ExecutionTreeNode[]>

Implementation of

IExecutionStore.getExecutionTree


getRecoveryHistory()

getRecoveryHistory(executionId): Promise<object[]>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:558

Get recovery history for an execution

Parameters

executionId

string

Returns

Promise<object[]>

Implementation of

IExecutionStore.getRecoveryHistory


getRootExecutions()

getRootExecutions(flowId, limit, after): Promise<RootExecution[]>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:121

Parameters

flowId

string

limit

number = 50

after

Date | null

Returns

Promise<RootExecution[]>

Implementation of

IExecutionStore.getRootExecutions


recordRecovery()

recordRecovery(executionId, workerId, reason, previousStatus?, previousWorkerId?): Promise<void>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:538

Record a recovery action for an execution

Parameters

executionId

string

workerId

string

reason

string

previousStatus?

string

previousWorkerId?

string

Returns

Promise<void>

Implementation of

IExecutionStore.recordRecovery


releaseExecution()

releaseExecution(executionId, workerId): Promise<void>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:342

Release an execution claim Also clears the processing fields in the execution row

Parameters

executionId

string

workerId

string

Returns

Promise<void>

Implementation of

IExecutionStore.releaseExecution


releaseRecoveryLock()

releaseRecoveryLock(lockId): Promise<void>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:586

Release a PostgreSQL advisory lock

Parameters

lockId

number

Returns

Promise<void>

Implementation of

IExecutionStore.releaseRecoveryLock


tryAcquireRecoveryLock()

tryAcquireRecoveryLock(lockId): Promise<boolean>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:570

Try to acquire a PostgreSQL advisory lock Returns true if lock was acquired, false if another session holds it

Parameters

lockId

number

Returns

Promise<boolean>

Implementation of

IExecutionStore.tryAcquireRecoveryLock


updateExecutionStatus()

updateExecutionStatus(params): Promise<boolean>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:64

Atomically update execution status and related fields This method avoids the need to fetch the execution first

Parameters

params

UpdateExecutionStatusParams

Returns

Promise<boolean>

Implementation of

IExecutionStore.updateExecutionStatus


updateFailureInfo()

updateFailureInfo(executionId, failureCount, reason): Promise<boolean>

Defined in: packages/chaingraph-executor/server/stores/postgres/postgres-execution-store.ts:464

Update failure count and reason for an execution

Parameters

executionId

string

failureCount

number

reason

string

Returns

Promise<boolean>

Implementation of

IExecutionStore.updateFailureInfo

Licensed under BUSL-1.1