ChainGraph API Documentation / @badaitech/chaingraph-channel / types / ChannelLogger
Interface: ChannelLogger<T>
Defined in: types/ChannelLogger.d.ts:77
External logger interface for channel persistence and observability.
Enables:
- Database Persistence: Log all values for replay capability
- Metrics Collection: Track throughput, buffer size, etc.
- Audit Trails: Complete history of all channel operations
- Debugging: Trace data flow through channels
Design Philosophy:
- All callbacks are optional (pay only for what you use)
- All callbacks can be async (support database writes, network calls)
- Logger is attached via
setLogger(), not constructor (flexibility) - Errors in logger callbacks don't break channel operations
Examples
const logger: ChannelLogger<Message> = {
async onSend(value, position) {
// Persist every value to database
await db.insert(execution_stream_logs).values({
executionId: context.executionId,
nodeId: node.id,
portId: port.id,
sequenceNumber: position,
value: JSON.stringify(value),
timestamp: new Date()
})
},
async onPrune(items, reason) {
// Items already in database, safe to prune from memory
console.log(`Pruned ${items.length} items from memory (${reason})`)
}
}
channel.setLogger(logger)const logger: ChannelLogger<any> = {
onSend(value, position) {
metrics.increment('channel.sends', 1, {
nodeId: node.id,
portId: port.id
})
metrics.gauge('channel.position', position)
metrics.histogram('channel.value_size', JSON.stringify(value).length)
},
onPrune(items, reason) {
metrics.increment('channel.prunes', items.length, { reason })
}
}const logger: ChannelLogger<Event> = {
async onSend(value, position) {
// Log to both database and metrics in parallel
await Promise.all([
db.insert(logs).values({ value, position }),
metrics.track('event', value)
])
}
}Type Parameters
T
T
The type of values being logged
Properties
onClose()?
optionalonClose: () =>void|Promise<void>
Defined in: types/ChannelLogger.d.ts:187
Called when the channel is closed.
Use Cases:
- Flush any buffered logs
- Close database connections
- Send final metrics
- Cleanup resources
Timing: Called during close() after all subscribers notified.
Returns
void | Promise<void>
Example
onClose() {
await logBatcher.flush()
await db.close()
}onError()?
optionalonError: (error) =>void|Promise<void>
Defined in: types/ChannelLogger.d.ts:212
Called when an error is set on the channel.
Use Cases:
- Log errors to monitoring system
- Trigger alerts
- Record error state in database
Parameters
error
Error
The error that occurred
Returns
void | Promise<void>
Example
async onError(error) {
await db.insert(stream_errors).values({
execution_id: executionId,
error_message: error.message,
error_stack: error.stack,
timestamp: new Date()
})
alerts.error('Stream error', { error: error.message })
}onPrune()?
optionalonPrune: (items,reason) =>void|Promise<void>
Defined in: types/ChannelLogger.d.ts:167
Called when buffer items are pruned from memory.
Use Cases:
- Verify items already persisted before removal
- Track memory management efficiency
- Alert on excessive pruning (slow consumers)
- Debug buffer behavior
Timing: Called during buffer cleanup (triggered by send() or close()).
Error Handling: If this callback throws, the error is logged but buffer cleanup continues (pruning can't be prevented).
Parameters
items
T[]
Array of items being removed from buffer
reason
Why items were pruned
Returns
void | Promise<void>
Examples
async onPrune(items, reason) {
// Verify all items were logged
const positions = items.map((_, i) => startPosition + i)
const logged = await db.count(stream_logs)
.where(in(stream_logs.sequence_number, positions))
if (logged !== items.length) {
console.error(`WARNING: Pruning ${items.length} items, but only ${logged} in database!`)
} else {
console.log(`✅ Safely pruned ${items.length} items (${reason})`)
}
}onPrune(items, reason) {
metrics.increment('channel.buffer_prunes', items.length, {
reason,
node_id: nodeId
})
if (reason === 'max-size') {
// Alert: buffer hit size limit (possible slow consumer)
alerts.warn(`Channel buffer full, pruned ${items.length} items`)
}
}onSend()?
optionalonSend: (value,position) =>void|Promise<void>
Defined in: types/ChannelLogger.d.ts:118
Called when a value is sent to the channel (before subscribers notified).
Use Cases:
- Persist all values to database for replay
- Track metrics (throughput, value sizes, etc.)
- Audit logging for compliance
- Real-time monitoring dashboards
Timing: Called synchronously during send(), awaited before returning.
Error Handling: If this callback throws, the error propagates to the caller of send(). The value is NOT added to the buffer if logging fails.
Parameters
value
T
The value being sent
position
number
Sequence number in the stream (0-based, monotonically increasing)
Returns
void | Promise<void>
Examples
async onSend(value, position) {
await db.insert(stream_logs).values({
execution_id: executionId,
node_id: nodeId,
port_id: portId,
sequence_number: position,
value: value,
timestamp: new Date()
})
}onSend(value, position) {
// Synchronous metrics (no await)
metrics.increment('sends')
metrics.gauge('position', position)
}