Async Generator Utilities
The newAsyncGenerator function creates async generators from callback-based initialization functions, providing a bridge between imperative event handling and declarative async iteration patterns.
Type Definition
async function* newAsyncGenerator<T, E = Error>(
init: (
next: (value: T) => Promise<boolean>,
done: (err?: E) => Promise<boolean>,
) => void | (() => void | Promise<void>),
skipValues : boolean = false
): AsyncGenerator<T>;
Parameters:
init: Initialization function that sets up the async generator behaviornext: Function to yield values to consumers(value: T) => Promise<boolean>done: Function to signal completion or error(err?: E) => Promise<boolean>- Returns: Optional cleanup function that will be called when the generator terminates
skipValues: If true, only the most recent value is kept in the queue, skipping intermediate values (not consumed values are considered skipped)
The initialization function receives two callback functions that control the generator’s behavior. The next function yields values to consumers, while the done function signals completion or error conditions. The initialization function can return a cleanup function that will be called when the generator is closed or an error occurs.
Returns AsyncGenerator<T> that yields emitted values with proper backpressure management and cleanup.
Practical Usage Patterns
StateWalker Trigger Implementation
Async generators integrate seamlessly with StateWalker’s trigger handlers, enabling reactive event generation based on data model changes or external events.
async function* UserActivityTrigger(context: ProcessContext): AsyncGenerator<string> {
const userModel = getUserModel(context);
const sessionModel = getSessionModel(context);
yield* newAsyncGenerator<string>((next, done) => {
const cleanup = userModel.onChange(() => {
const user = userModel.currentUser;
if (!user) {
next("userLoggedOut");
} else if (user.isActive) {
next("userBecameActive");
} else {
next("userBecameInactive");
}
});
const sessionCleanup = sessionModel.onChange(() => {
if (sessionModel.isExpired) {
next("sessionExpired");
}
});
return () => {
cleanup();
sessionCleanup();
};
});
}
External API Integration
Async generators can wrap external APIs or WebSocket connections to provide reactive data streams that integrate naturally with StateWalker’s event-driven architecture.
async function* WebSocketEventStream(context: ProcessContext): AsyncGenerator<string> {
const config = getConfiguration(context);
yield* newAsyncGenerator<string>((next, done) => {
const socket = new WebSocket(config.websocketUrl);
socket.onopen = () => next("connected");
socket.onclose = () => next("disconnected");
socket.onerror = (error) => done(error);
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "notification":
next("notificationReceived");
break;
case "update":
next("dataUpdated");
break;
case "error":
next("errorOccurred");
break;
}
};
return () => {
socket.close();
};
});
}
DOM Event Integration
Convert DOM events to async generators for reactive UI programming.
// Convert DOM events to async generator
const clickGenerator = newAsyncGenerator<MouseEvent>((next, done) => {
const handler = (event: MouseEvent) => next(event);
document.addEventListener('click', handler);
return () => document.removeEventListener('click', handler);
});
// Consume the generator
for await (const clickEvent of clickGenerator()) {
console.log('Click at:', clickEvent.clientX, clickEvent.clientY);
}
Timer and Interval Operations
The generator pattern excels at time-based event generation, providing clean abstractions for timeouts, intervals, and scheduled operations.
async function* TimerEvents(context: ProcessContext): AsyncGenerator<string> {
const config = getConfiguration(context);
yield* newAsyncGenerator<string>((next, done) => {
// Initial delay before starting periodic events
const initialTimeout = setTimeout(() => {
next("started");
}, config.initialDelay);
// Periodic heartbeat events
const interval = setInterval(() => {
next("heartbeat");
}, config.heartbeatInterval);
// Auto-shutdown after maximum duration
const shutdownTimeout = setTimeout(() => {
next("timeout");
done();
}, config.maxDuration);
return () => {
clearTimeout(initialTimeout);
clearInterval(interval);
clearTimeout(shutdownTimeout);
};
});
}
Basic Implementation
For scenarios where dropping values during backpressure is acceptable, the newAsyncGenerator provides a lightweight alternative that maintains only the most recent value. This implementation is ideal for UI updates, status notifications, or any use case where intermediate values can be safely discarded in favor of the latest state.
The simple generator uses a single-value buffer that gets overwritten with each new emission, ensuring constant memory usage regardless of production rate. When the consumer is slower than the producer, intermediate values are automatically dropped, preventing memory buildup while maintaining responsiveness to the most current data.
async function* newAsyncGenerator<T, E = Error>(
listen: (
next: (value: T) => Promise<boolean>,
done: (error?: E) => Promise<boolean>,
) =>
| void
| (() => void | Promise<void>),
): AsyncGenerator<T> {
let done = false;
let value: T = undefined as T;
let error: E | undefined;
let resolve: () => void = () => {};
let promise = new Promise<void>((y) => {
resolve = y;
});
const unsubscribe = listen(
async (val: T) => {
value = val;
resolve();
return true;
},
async (err?: E) => {
done = true;
error = err;
resolve();
return true;
},
);
try {
while (!done) {
await promise;
if (done) {
if (error) throw error;
break;
}
yield value as T;
promise = new Promise<void>((y) => {
resolve = y;
});
}
} finally {
await unsubscribe?.();
}
}
This implementation offers several advantages for specific use cases:
Memory Efficiency: Uses only a single variable to store the current value, making it suitable for high-frequency data streams where memory consumption is a concern.
Automatic Backpressure Handling: By design, it drops intermediate values when the consumer cannot keep up, preventing queue buildup without requiring explicit backpressure management.
Simplified API: The listen function only requires a simple callback without needing to handle Promise-based return values or completion signals, making integration with existing callback-based APIs straightforward.
Immediate Response: Always yields the most recent value, ensuring that consumers receive the latest state even when processing is delayed.
The simple generator is particularly well-suited for reactive UI components, real-time status displays, or any scenario where only the current state matters and historical values can be safely discarded.
Full Implementation
The full implementation preserves all notifications and provides backpressure management:
/**
* The newAsyncGenerator function creates async generators from callback-based initialization
* functions, providing a bridge between imperative event handling and declarative async
* iteration patterns.
*
* The initialization function receives two callback functions that control the generator's
* behavior. The `next` function yields values to consumers, while the `done` function
* signals completion or error conditions. The initialization function can return a cleanup
* function that will be called when the generator is closed or an error occurs.
*
* The generator manages an internal queue of values and completion signals, ensuring that
* producers can yield values without overwhelming consumers. Proper backpressure is implemented
* by having the `next` and `done` functions return promises that resolve only after the
* consumer has processed the values. This prevents memory leaks and ensures that producers
* are aware of whether their values were successfully handled.
* The generator also handles cleanup by draining any remaining items in the queue and notifying
* producers that their values were not processed if the generator is closed early. This ensures
* that resources are properly managed and that no memory leaks occur.
*
* Example usage:
* ```typescript
* const asyncGen = newAsyncGenerator<number>((next, done) => {
* let count = 0;
* const interval = setInterval(() => {
* if (count < 5) {
* next(count++);
* } else {
* done();
* clearInterval(interval);
* }
* }, 1000);
* return () => clearInterval(interval); // Cleanup function
* });
* (async () => {
* for await (const num of asyncGen) {
* console.log(num); // Logs numbers 0 to 4 at 1 second intervals
* }
* console.log("Completed");
* })();
* ```
* @template T The type of values yielded by the generator
* @template E The type of errors that can be thrown; defaults to Error
* @param init Initialization function that sets up the generator behavior with next/done callbacks;
* The first parameter - `next` function - is used to yield values to consumers;
* It returns a Promise<boolean> indicating whether the value was successfully handled;
* The second parameter - `done` function - is used to signal completion or error;
* It returns a Promise<boolean> indicating whether the completion was successfully handled;
* The initialization function can optionally return a cleanup function that will be called
* when the generator is closed or an error occurs;
* @param skipValues If true, only the most recent value is kept in the queue,
* skipping intermediate values (not consumed values are considered skipped);
* This is useful for scenarios where only the latest value matters, such as UI updates.
* Defaults to false, meaning all values are queued and processed in order.
* @returns AsyncGenerator that properly manages backpressure and resource cleanup
*/
export async function* newAsyncGenerator<T, E = Error>(
/**
* Initialization function that sets up the async generator behavior.
* @param next - Function to yield values to consumers. Returns a Promise<boolean>
* indicating whether the value was successfully handled.
* @param done - Function to signal completion or error. Optional error parameter
* will cause the generator to throw that error.
* @returns Optional cleanup function that will be called when the generator terminates.
*/
init: (
next: (value: T) => Promise<boolean>,
done: (err?: E) => Promise<boolean>,
) => void | (() => void | Promise<void>),
/**
* Skipping queue implementation that maintains only the most recent value.
*/
skipValues = false,
): AsyncGenerator<T> {
/**
* Internal queue slot type that wraps values and completion signals with resolution callbacks.
* This enables the async generator to communicate back to producers whether their values
* were successfully processed, enabling backpressure management.
*/
type IterationSlot<T, E> =
| { done: false; value: T } // Regular value slot
| { done: true; error?: E }; // Completion/error slot
type QueueSlot<T, E> = IterationSlot<T, E> & {
next: QueueSlot<T, E> | undefined; // Pointer to the next slot in the queue
resolve: (handled: boolean) => void; // Callback to signal if the value was handled
};
let head: QueueSlot<T, E> | undefined; // Head of the queue
let tail: QueueSlot<T, E> | undefined; // Tail of the queue
/** Flag to prevent new values from being queued after generator closes */
let closed = false;
/** Wake-up function to notify the generator loop of new items */
let wakeUp: undefined | (() => void);
/**
* Drains the internal queue, notifying all pending producers that their values
* were not processed due to generator closure. This prevents memory leaks and
* ensures proper backpressure signaling.
*/
const drainQueue = () => {
for (; head; head = head.next) {
closed = closed || head.done;
head.resolve(false); // Notify producer that value was not handled
}
tail = undefined;
};
/**
* Enqueues a value or completion signal with a promise that resolves when the item
* is processed. This enables backpressure by allowing producers to know when their
* values have been consumed.
*/
const enqueue = (params: IterationSlot<T, E>): Promise<boolean> => {
if (skipValues) {
// Remove any previous value slots
drainQueue();
}
return !closed
? new Promise<boolean>((resolve) => {
// Add the new slot to the queue
const next = { ...params, next: undefined, resolve };
if (tail) {
tail.next = next;
}
tail = next;
if (!head) {
head = tail;
}
// Notify the generator loop that a new item is available
wakeUp?.();
})
: // If the generator is closed, immediately resolve as not handled
Promise.resolve(false);
};
/**
* Producer function to yield a value to consumers. Returns a promise that resolves
* to true if the value was successfully processed, false if the generator is closed
* or the value was skipped due to backpressure.
*/
const next = (value: T): Promise<boolean> => enqueue({ done: false, value });
/**
* Producer function to signal completion or error. Optional error parameter will
* cause the generator to throw that error to consumers.
*/
const done = (error?: E): Promise<boolean> => enqueue({ done: true, error });
// Initialize the producer by calling the init function with our control functions
const unsubscribe = init(next, done);
try {
// Main async generator loop - processes queued items and yields values
while (!closed) {
// Try to get the next item from the queue
if (!head) {
// If no items available, wait for producers to add something
await new Promise<void>((resolve) => {
wakeUp = resolve;
}).then(() => {
wakeUp = undefined;
});
continue;
}
// Process the next item in the queue
const slot = head;
// Move head to the next item
head = head.next;
// If we removed the tail, clear it as well
if (tail === slot) {
tail = head;
}
try {
// Handle completion/error signals
if (slot.done) {
closed = true;
if (slot.error !== undefined) {
throw slot.error;
}
break;
}
// Yield the value to the consumer
yield slot.value;
} finally {
// Signal successful processing
slot.resolve(true);
}
}
} finally {
/**
* Cleanup phase - ensures proper resource management and notification of any
* remaining producers. This runs whether the generator completes normally,
* encounters an error, or is closed early by the consumer.
*/
closed = true;
// Wake up any pending operations to allow them to exit
wakeUp?.();
// Call the cleanup function returned by the initialization function
if (typeof unsubscribe === "function") {
await unsubscribe();
}
/**
* Drain any remaining items in the queue and notify their producers that
* the values were not processed. This prevents memory leaks and ensures
* proper backpressure signaling.
*/
drainQueue();
}
}
Main features of this implementation:
- Backpressure support: Producers receive feedback about whether their values were successfully processed
- Error handling: Proper error propagation through the async generator
- Automatic cleanup: Resource cleanup when generator terminates normally or due to errors
- Memory efficiency:
skipValuesparameter allows to prevent memory buildup during backpressure scenarios