Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-channel / types / DBOSChannelConfig

Interface: DBOSChannelConfig

Defined in: types/ChannelConfig.d.ts:67

Configuration for DBOSChannel.

Extends

Properties

batchTimeoutMs?

optional batchTimeoutMs: number

Defined in: types/ChannelConfig.d.ts:195

Timeout for batch accumulation in milliseconds.

If batch doesn't reach consumerBatchSize within this time, it's flushed anyway.

Default

ts
25

cleanupStrategy?

optional cleanupStrategy: BufferCleanupStrategy

Defined in: types/ChannelConfig.d.ts:28

Buffer cleanup strategy when maxBufferSize is reached.

  • min-position: Keep items from slowest subscriber forward (fair to all)
  • sliding-window: Always keep last N items (simple, may drop slow consumers)
  • time-based: Remove items older than maxBufferAge
  • none: No cleanup (unbounded growth)

Default

ts
'min-position'

Inherited from

BaseChannelConfig.cleanupStrategy


consumerBatchSize?

optional consumerBatchSize: number

Defined in: types/ChannelConfig.d.ts:186

Batch size for yielding to consumers.

DBOSChannel yields arrays of items for efficiency.

Default

ts
100

deduplication?

optional deduplication: boolean

Defined in: types/ChannelConfig.d.ts:135

Enable offset-based deduplication between history and real-time.

Prevents race condition where same event appears in both history query and real-time buffer. Uses max offset from history to filter real-time buffer.

Default

ts
true

Example

ts
// History: offsets [98, 99, 100]
// Buffer:  offsets [100, 101, 102]
// Result:  offsets [98, 99, 100, 101, 102] (no duplicate 100)

fromOffset?

optional fromOffset: number

Defined in: types/ChannelConfig.d.ts:146

Starting offset for reading from the stream (manual override).

Overrides lastN calculation. Useful for resuming from specific position.

  • 0: Read from beginning
  • N: Skip first N items

Default

ts
undefined (use lastN or read all)

lastN?

optional lastN: number

Defined in: types/ChannelConfig.d.ts:120

For 'history' mode: How many historical events to read.

  • undefined or 0: Read ALL history from offset 0
  • number > 0: Read last N events only

This enables "warm start" subscriptions where you want recent context without replaying the entire history.

Default

ts
undefined (read all history)

Examples

ts
Read all history
readMode: 'history'
lastN: undefined
ts
Read last 100 events
readMode: 'history'
lastN: 100

maxBufferAge?

optional maxBufferAge: number

Defined in: types/ChannelConfig.d.ts:36

Maximum age of buffered items in milliseconds (for time-based cleanup).

Only used when cleanupStrategy is 'time-based'.

Default

ts
60000 (1 minute)

Inherited from

BaseChannelConfig.maxBufferAge


maxBufferSize?

optional maxBufferSize: number

Defined in: types/ChannelConfig.d.ts:17

Maximum buffer size (0 = unbounded)

When buffer exceeds this size, cleanup is triggered based on cleanupStrategy.

Default

ts
1000 for InMemoryChannel

Default

ts
0 for DBOSChannel (database-backed, no in-memory buffer limit)

Inherited from

BaseChannelConfig.maxBufferSize


pgConnectionString?

optional pgConnectionString: string

Defined in: types/ChannelConfig.d.ts:199

PostgreSQL connection string for LISTEN/NOTIFY.


pgPool?

optional pgPool: unknown

Defined in: types/ChannelConfig.d.ts:203

PostgreSQL query pool for SELECT queries.


queryBatchSize?

optional queryBatchSize: number

Defined in: types/ChannelConfig.d.ts:178

Batch size for database reads.

Higher values reduce database queries but increase memory usage.

Default

ts
1000

readMode?

optional readMode: ReadMode

Defined in: types/ChannelConfig.d.ts:100

Read mode for stream consumption.

  • 'history': Read history (all or last N), then continue real-time
  • 'realtime': Skip history, start from current position

Default

ts
'history'

Examples

ts
Read all history, then real-time
readMode: 'history'
lastN: undefined
ts
Read last 50 events, then real-time
readMode: 'history'
lastN: 50
ts
Real-time only (skip history)
readMode: 'realtime'

retry?

optional retry: object

Defined in: types/ChannelConfig.d.ts:207

Retry configuration for transient failures.

backoffMultiplier?

optional backoffMultiplier: number

Backoff multiplier for exponential backoff.

Default
ts
2

initialDelay?

optional initialDelay: number

Initial retry delay in milliseconds.

Default
ts
100

maxAttempts?

optional maxAttempts: number

Maximum retry attempts for failed operations.

Default
ts
3

maxDelay?

optional maxDelay: number

Maximum retry delay in milliseconds.

Default
ts
5000

streamKey

streamKey: string

Defined in: types/ChannelConfig.d.ts:80

Stream key within the workflow.

A workflow can have multiple streams (e.g., 'events', 'results', 'metrics').


toOffset?

optional toOffset: number

Defined in: types/ChannelConfig.d.ts:161

Ending offset for reading from the stream.

  • undefined: Read until channel closed (infinite)
  • N: Stop after reading offset N-1

Useful for reading specific ranges.

Default

ts
undefined (infinite)

Example

ts
Read offsets 1000-1999
fromOffset: 1000
toOffset: 2000

where()?

optional where: (value) => boolean

Defined in: types/ChannelConfig.d.ts:170

Client-side filter predicate.

Applied AFTER database query (not in SQL). Simple and flexible.

Parameters

value

any

Returns

boolean

Example

ts
Filter by event type
where: (event) => event.type === 'PORT_UPDATED'

workflowId

workflowId: string

Defined in: types/ChannelConfig.d.ts:74

DBOS workflow UUID identifying the stream.

All servers reading/writing to the same workflowId + streamKey will be connected to the same distributed stream.

Licensed under BUSL-1.1