Skip to content

ChainGraph API Documentation / @badaitech/chaingraph-channel / memory / InMemoryChannel

Class: InMemoryChannel<T>

Defined in: memory/InMemoryChannel.d.ts:86

In-memory implementation of IChannel with bounded buffer and automatic cleanup.

Features:

  • ✅ Browser and Node.js compatible
  • ✅ Zero external dependencies (just zod, nanoid)
  • ✅ Bounded buffer prevents memory leaks
  • ✅ Multiple independent subscribers
  • ✅ Async logger support for database persistence
  • ✅ Serializable for node cloning
  • ✅ Deep copy on clone (no shared state)
  • ✅ Fire-and-forget or await patterns

Fixes Applied (from analysis):

  1. ✅ Bounded buffer with automatic cleanup (max 1000 items by default)
  2. ✅ getBuffer() returns readonly reference
  3. ✅ clone() deep copies buffer contents
  4. ✅ setError() closes channel and notifies subscribers
  5. ✅ Subscriber cleanup on exception during iteration
  6. ✅ sendBatch([]) early return (no spurious notifications)
  7. ✅ throw() method on Subscriber (complete AsyncIterator protocol)
  8. ✅ send() always returns Promise (consistent async API)

Best For:

  • Single-server deployments
  • Browser-based execution
  • Testing and development
  • High-performance local streaming

Examples

typescript
const channel = new InMemoryChannel<string>({ maxBufferSize: 100 })

await channel.send('hello')
await channel.send('world')
await channel.close()

for await (const value of channel) {
  console.log(value)
}
typescript
const channel = new InMemoryChannel<Message>({ maxBufferSize: 1000 })

channel.setLogger({
  async onSend(value, position) {
    await db.insert(logs).values({ value, position })
  },
  async onPrune(items, reason) {
    console.log(`Pruned ${items.length} items (${reason})`)
  }
})

// All sends logged to database, memory stays bounded
for (const item of largeDataset) {
  await channel.send(item)
}

Type Parameters

T

T

The type of values transmitted through the channel

Implements

Constructors

Constructor

new InMemoryChannel<T>(config?): InMemoryChannel<T>

Defined in: memory/InMemoryChannel.d.ts:101

Parameters

config?

InMemoryChannelConfig

Returns

InMemoryChannel<T>

Methods

[asyncIterator]()

[asyncIterator](): AsyncIterableIterator<T[]>

Defined in: memory/InMemoryChannel.d.ts:134

Returns async iterator for pull-based consumption.

UPDATED: Now yields BATCHES (T[]) instead of individual items.

See IChannel interface for detailed documentation.

Returns

AsyncIterableIterator<T[]>

Implementation of

IChannel.[asyncIterator]


clone()

clone(): IChannel<T>

Defined in: memory/InMemoryChannel.d.ts:164

Clone channel with deep copy (fix #3).

Creates a completely independent copy with no shared state.

Returns

IChannel<T>

Implementation of

IChannel.clone


close()

close(): Promise<void>

Defined in: memory/InMemoryChannel.d.ts:120

Closes the channel gracefully.

See IChannel interface for detailed documentation.

Returns

Promise<void>

Implementation of

IChannel.close


getBuffer()

getBuffer(): readonly T[]

Defined in: memory/InMemoryChannel.d.ts:179

Returns readonly reference to buffer (fix #2).

Returns readonly array to prevent external mutation. For defensive copy, use spread: [...channel.getBuffer()]

Returns

readonly T[]

Implementation of

IChannel.getBuffer


getError()

getError(): Error | null

Defined in: memory/InMemoryChannel.d.ts:148

Get any error that was set.

Returns

Error | null

Implementation of

IChannel.getError


getStats()

getStats(): ChannelStats

Defined in: memory/InMemoryChannel.d.ts:144

Get channel statistics.

See IChannel interface for detailed documentation.

Returns

ChannelStats

Implementation of

IChannel.getStats


isChannelClosed()

isChannelClosed(): boolean

Defined in: memory/InMemoryChannel.d.ts:138

Check if channel is closed.

Returns

boolean

Implementation of

IChannel.isChannelClosed


removeSubscriber()

removeSubscriber(id): void

Defined in: memory/InMemoryChannel.d.ts:184

Internal: Remove subscriber by ID. Called by Subscriber on completion or cleanup.

Parameters

id

string

Returns

void


send()

send(value): Promise<void>

Defined in: memory/InMemoryChannel.d.ts:108

Sends a single value to the channel.

Always returns Promise to enable flexible await strategies. See IChannel interface for detailed documentation and examples.

Parameters

value

T

Returns

Promise<void>

Implementation of

IChannel.send


sendBatch()

sendBatch(values): Promise<void>

Defined in: memory/InMemoryChannel.d.ts:114

Sends multiple values at once (more efficient than individual sends).

See IChannel interface for detailed documentation.

Parameters

values

T[]

Returns

Promise<void>

Implementation of

IChannel.sendBatch


serialize()

serialize(): unknown

Defined in: memory/InMemoryChannel.d.ts:158

Serialize channel state to JSON.

Returns

unknown

Implementation of

IChannel.serialize


setError()

setError(error): void

Defined in: memory/InMemoryChannel.d.ts:154

Set error state and close channel (fix #4).

This automatically closes the channel and notifies all subscribers.

Parameters

error

Error

Returns

void

Implementation of

IChannel.setError


setLogger()

setLogger(logger): void

Defined in: memory/InMemoryChannel.d.ts:172

Attach external logger for persistence and observability.

Parameters

logger

ChannelLogger<T>

Returns

void

Implementation of

IChannel.setLogger


subscribe()

subscribe(handler, onError?, onComplete?): () => void

Defined in: memory/InMemoryChannel.d.ts:126

Subscribe with callback-based consumption.

See IChannel interface for detailed documentation.

Parameters

handler

(value) => void | Promise<void>

onError?

(error) => void

onComplete?

() => void | Promise<void>

Returns

(): void

Returns

void

Implementation of

IChannel.subscribe


deserialize()

static deserialize<T>(data): InMemoryChannel<T>

Defined in: memory/InMemoryChannel.d.ts:168

Deserialize from JSON state.

Type Parameters

T

T

Parameters

data

unknown

Returns

InMemoryChannel<T>

Licensed under BUSL-1.1