ChainGraph API Documentation / @badaitech/chaingraph-channel / memory / InMemoryChannel
Class: InMemoryChannel<T>
Defined in: memory/InMemoryChannel.d.ts:86
In-memory implementation of IChannel with bounded buffer and automatic cleanup.
Features:
- ✅ Browser and Node.js compatible
- ✅ Zero external dependencies (just zod, nanoid)
- ✅ Bounded buffer prevents memory leaks
- ✅ Multiple independent subscribers
- ✅ Async logger support for database persistence
- ✅ Serializable for node cloning
- ✅ Deep copy on clone (no shared state)
- ✅ Fire-and-forget or await patterns
Fixes Applied (from analysis):
- ✅ Bounded buffer with automatic cleanup (max 1000 items by default)
- ✅ getBuffer() returns readonly reference
- ✅ clone() deep copies buffer contents
- ✅ setError() closes channel and notifies subscribers
- ✅ Subscriber cleanup on exception during iteration
- ✅ sendBatch([]) early return (no spurious notifications)
- ✅ throw() method on Subscriber (complete AsyncIterator protocol)
- ✅ send() always returns Promise (consistent async API)
Best For:
- Single-server deployments
- Browser-based execution
- Testing and development
- High-performance local streaming
Examples
const channel = new InMemoryChannel<string>({ maxBufferSize: 100 })
await channel.send('hello')
await channel.send('world')
await channel.close()
for await (const value of channel) {
console.log(value)
}const channel = new InMemoryChannel<Message>({ maxBufferSize: 1000 })
channel.setLogger({
async onSend(value, position) {
await db.insert(logs).values({ value, position })
},
async onPrune(items, reason) {
console.log(`Pruned ${items.length} items (${reason})`)
}
})
// All sends logged to database, memory stays bounded
for (const item of largeDataset) {
await channel.send(item)
}Type Parameters
T
T
The type of values transmitted through the channel
Implements
IChannel<T>
Constructors
Constructor
new InMemoryChannel<
T>(config?):InMemoryChannel<T>
Defined in: memory/InMemoryChannel.d.ts:101
Parameters
config?
Returns
InMemoryChannel<T>
Methods
[asyncIterator]()
[asyncIterator]():
AsyncIterableIterator<T[]>
Defined in: memory/InMemoryChannel.d.ts:134
Returns async iterator for pull-based consumption.
UPDATED: Now yields BATCHES (T[]) instead of individual items.
See IChannel interface for detailed documentation.
Returns
AsyncIterableIterator<T[]>
Implementation of
clone()
clone():
IChannel<T>
Defined in: memory/InMemoryChannel.d.ts:164
Clone channel with deep copy (fix #3).
Creates a completely independent copy with no shared state.
Returns
IChannel<T>
Implementation of
close()
close():
Promise<void>
Defined in: memory/InMemoryChannel.d.ts:120
Closes the channel gracefully.
See IChannel interface for detailed documentation.
Returns
Promise<void>
Implementation of
getBuffer()
getBuffer(): readonly
T[]
Defined in: memory/InMemoryChannel.d.ts:179
Returns readonly reference to buffer (fix #2).
Returns readonly array to prevent external mutation. For defensive copy, use spread: [...channel.getBuffer()]
Returns
readonly T[]
Implementation of
getError()
getError():
Error|null
Defined in: memory/InMemoryChannel.d.ts:148
Get any error that was set.
Returns
Error | null
Implementation of
getStats()
getStats():
ChannelStats
Defined in: memory/InMemoryChannel.d.ts:144
Get channel statistics.
See IChannel interface for detailed documentation.
Returns
Implementation of
isChannelClosed()
isChannelClosed():
boolean
Defined in: memory/InMemoryChannel.d.ts:138
Check if channel is closed.
Returns
boolean
Implementation of
removeSubscriber()
removeSubscriber(
id):void
Defined in: memory/InMemoryChannel.d.ts:184
Internal: Remove subscriber by ID. Called by Subscriber on completion or cleanup.
Parameters
id
string
Returns
void
send()
send(
value):Promise<void>
Defined in: memory/InMemoryChannel.d.ts:108
Sends a single value to the channel.
Always returns Promise to enable flexible await strategies. See IChannel interface for detailed documentation and examples.
Parameters
value
T
Returns
Promise<void>
Implementation of
sendBatch()
sendBatch(
values):Promise<void>
Defined in: memory/InMemoryChannel.d.ts:114
Sends multiple values at once (more efficient than individual sends).
See IChannel interface for detailed documentation.
Parameters
values
T[]
Returns
Promise<void>
Implementation of
serialize()
serialize():
unknown
Defined in: memory/InMemoryChannel.d.ts:158
Serialize channel state to JSON.
Returns
unknown
Implementation of
setError()
setError(
error):void
Defined in: memory/InMemoryChannel.d.ts:154
Set error state and close channel (fix #4).
This automatically closes the channel and notifies all subscribers.
Parameters
error
Error
Returns
void
Implementation of
setLogger()
setLogger(
logger):void
Defined in: memory/InMemoryChannel.d.ts:172
Attach external logger for persistence and observability.
Parameters
logger
Returns
void
Implementation of
subscribe()
subscribe(
handler,onError?,onComplete?): () =>void
Defined in: memory/InMemoryChannel.d.ts:126
Subscribe with callback-based consumption.
See IChannel interface for detailed documentation.
Parameters
handler
(value) => void | Promise<void>
onError?
(error) => void
onComplete?
() => void | Promise<void>
Returns
():
void
Returns
void
Implementation of
deserialize()
staticdeserialize<T>(data):InMemoryChannel<T>
Defined in: memory/InMemoryChannel.d.ts:168
Deserialize from JSON state.
Type Parameters
T
T
Parameters
data
unknown
Returns
InMemoryChannel<T>