Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-channel / types / ChannelLogger

Interface: ChannelLogger<T>

Defined in: types/ChannelLogger.d.ts:77

External logger interface for channel persistence and observability.

Enables:

  • Database Persistence: Log all values for replay capability
  • Metrics Collection: Track throughput, buffer size, etc.
  • Audit Trails: Complete history of all channel operations
  • Debugging: Trace data flow through channels

Design Philosophy:

  • All callbacks are optional (pay only for what you use)
  • All callbacks can be async (support database writes, network calls)
  • Logger is attached via setLogger(), not constructor (flexibility)
  • Errors in logger callbacks don't break channel operations

Examples

typescript
const logger: ChannelLogger<Message> = {
  async onSend(value, position) {
    // Persist every value to database
    await db.insert(execution_stream_logs).values({
      executionId: context.executionId,
      nodeId: node.id,
      portId: port.id,
      sequenceNumber: position,
      value: JSON.stringify(value),
      timestamp: new Date()
    })
  },

  async onPrune(items, reason) {
    // Items already in database, safe to prune from memory
    console.log(`Pruned ${items.length} items from memory (${reason})`)
  }
}

channel.setLogger(logger)
typescript
const logger: ChannelLogger<any> = {
  onSend(value, position) {
    metrics.increment('channel.sends', 1, {
      nodeId: node.id,
      portId: port.id
    })
    metrics.gauge('channel.position', position)
    metrics.histogram('channel.value_size', JSON.stringify(value).length)
  },

  onPrune(items, reason) {
    metrics.increment('channel.prunes', items.length, { reason })
  }
}
typescript
const logger: ChannelLogger<Event> = {
  async onSend(value, position) {
    // Log to both database and metrics in parallel
    await Promise.all([
      db.insert(logs).values({ value, position }),
      metrics.track('event', value)
    ])
  }
}

Type Parameters

T

T

The type of values being logged

Properties

onClose()?

optional onClose: () => void | Promise<void>

Defined in: types/ChannelLogger.d.ts:187

Called when the channel is closed.

Use Cases:

  • Flush any buffered logs
  • Close database connections
  • Send final metrics
  • Cleanup resources

Timing: Called during close() after all subscribers notified.

Returns

void | Promise<void>

Example

typescript
onClose() {
  await logBatcher.flush()
  await db.close()
}

onError()?

optional onError: (error) => void | Promise<void>

Defined in: types/ChannelLogger.d.ts:212

Called when an error is set on the channel.

Use Cases:

  • Log errors to monitoring system
  • Trigger alerts
  • Record error state in database

Parameters

error

Error

The error that occurred

Returns

void | Promise<void>

Example

typescript
async onError(error) {
  await db.insert(stream_errors).values({
    execution_id: executionId,
    error_message: error.message,
    error_stack: error.stack,
    timestamp: new Date()
  })

  alerts.error('Stream error', { error: error.message })
}

onPrune()?

optional onPrune: (items, reason) => void | Promise<void>

Defined in: types/ChannelLogger.d.ts:167

Called when buffer items are pruned from memory.

Use Cases:

  • Verify items already persisted before removal
  • Track memory management efficiency
  • Alert on excessive pruning (slow consumers)
  • Debug buffer behavior

Timing: Called during buffer cleanup (triggered by send() or close()).

Error Handling: If this callback throws, the error is logged but buffer cleanup continues (pruning can't be prevented).

Parameters

items

T[]

Array of items being removed from buffer

reason

PruneReason

Why items were pruned

Returns

void | Promise<void>

Examples

typescript
async onPrune(items, reason) {
  // Verify all items were logged
  const positions = items.map((_, i) => startPosition + i)
  const logged = await db.count(stream_logs)
    .where(in(stream_logs.sequence_number, positions))

  if (logged !== items.length) {
    console.error(`WARNING: Pruning ${items.length} items, but only ${logged} in database!`)
  } else {
    console.log(`✅ Safely pruned ${items.length} items (${reason})`)
  }
}
typescript
onPrune(items, reason) {
  metrics.increment('channel.buffer_prunes', items.length, {
    reason,
    node_id: nodeId
  })

  if (reason === 'max-size') {
    // Alert: buffer hit size limit (possible slow consumer)
    alerts.warn(`Channel buffer full, pruned ${items.length} items`)
  }
}

onSend()?

optional onSend: (value, position) => void | Promise<void>

Defined in: types/ChannelLogger.d.ts:118

Called when a value is sent to the channel (before subscribers notified).

Use Cases:

  • Persist all values to database for replay
  • Track metrics (throughput, value sizes, etc.)
  • Audit logging for compliance
  • Real-time monitoring dashboards

Timing: Called synchronously during send(), awaited before returning.

Error Handling: If this callback throws, the error propagates to the caller of send(). The value is NOT added to the buffer if logging fails.

Parameters

value

T

The value being sent

position

number

Sequence number in the stream (0-based, monotonically increasing)

Returns

void | Promise<void>

Examples

typescript
async onSend(value, position) {
  await db.insert(stream_logs).values({
    execution_id: executionId,
    node_id: nodeId,
    port_id: portId,
    sequence_number: position,
    value: value,
    timestamp: new Date()
  })
}
typescript
onSend(value, position) {
  // Synchronous metrics (no await)
  metrics.increment('sends')
  metrics.gauge('position', position)
}

Licensed under BUSL-1.1