Async Generator Utilities

The newAsyncGenerator function creates async generators from callback-based initialization functions, providing a bridge between imperative event handling and declarative async iteration patterns.

Type Definition

async function* newAsyncGenerator<T, E = Error>(
  init: (
    next: (value: T) => Promise<boolean>,
    done: (err?: E) => Promise<boolean>,
  ) => void | (() => void | Promise<void>),
  skipValues : boolean = false
): AsyncGenerator<T>;

Parameters:

The initialization function receives two callback functions that control the generator’s behavior. The next function yields values to consumers, while the done function signals completion or error conditions. The initialization function can return a cleanup function that will be called when the generator is closed or an error occurs.

Returns AsyncGenerator<T> that yields emitted values with proper backpressure management and cleanup.

Practical Usage Patterns

StateWalker Trigger Implementation

Async generators integrate seamlessly with StateWalker’s trigger handlers, enabling reactive event generation based on data model changes or external events.

async function* UserActivityTrigger(context: ProcessContext): AsyncGenerator<string> {
  const userModel = getUserModel(context);
  const sessionModel = getSessionModel(context);
  
  yield* newAsyncGenerator<string>((next, done) => {
    const cleanup = userModel.onChange(() => {
      const user = userModel.currentUser;
      
      if (!user) {
        next("userLoggedOut");
      } else if (user.isActive) {
        next("userBecameActive");
      } else {
        next("userBecameInactive");
      }
    });
    
    const sessionCleanup = sessionModel.onChange(() => {
      if (sessionModel.isExpired) {
        next("sessionExpired");
      }
    });
    
    return () => {
      cleanup();
      sessionCleanup();
    };
  });
}

External API Integration

Async generators can wrap external APIs or WebSocket connections to provide reactive data streams that integrate naturally with StateWalker’s event-driven architecture.

async function* WebSocketEventStream(context: ProcessContext): AsyncGenerator<string> {
  const config = getConfiguration(context);
  
  yield* newAsyncGenerator<string>((next, done) => {
    const socket = new WebSocket(config.websocketUrl);
    
    socket.onopen = () => next("connected");
    socket.onclose = () => next("disconnected");
    socket.onerror = (error) => done(error);
    
    socket.onmessage = (event) => {
      const data = JSON.parse(event.data);
      
      switch (data.type) {
        case "notification":
          next("notificationReceived");
          break;
        case "update":
          next("dataUpdated");
          break;
        case "error":
          next("errorOccurred");
          break;
      }
    };
    
    return () => {
      socket.close();
    };
  });
}

DOM Event Integration

Convert DOM events to async generators for reactive UI programming.

// Convert DOM events to async generator
const clickGenerator = newAsyncGenerator<MouseEvent>((next, done) => {
  const handler = (event: MouseEvent) => next(event);
  document.addEventListener('click', handler);
  return () => document.removeEventListener('click', handler);
});

// Consume the generator
for await (const clickEvent of clickGenerator()) {
  console.log('Click at:', clickEvent.clientX, clickEvent.clientY);
}

Timer and Interval Operations

The generator pattern excels at time-based event generation, providing clean abstractions for timeouts, intervals, and scheduled operations.

async function* TimerEvents(context: ProcessContext): AsyncGenerator<string> {
  const config = getConfiguration(context);
  
  yield* newAsyncGenerator<string>((next, done) => {
    // Initial delay before starting periodic events
    const initialTimeout = setTimeout(() => {
      next("started");
    }, config.initialDelay);
    
    // Periodic heartbeat events
    const interval = setInterval(() => {
      next("heartbeat");
    }, config.heartbeatInterval);
    
    // Auto-shutdown after maximum duration
    const shutdownTimeout = setTimeout(() => {
      next("timeout");
      done();
    }, config.maxDuration);
    
    return () => {
      clearTimeout(initialTimeout);
      clearInterval(interval);
      clearTimeout(shutdownTimeout);
    };
  });
}

Basic Implementation

For scenarios where dropping values during backpressure is acceptable, the newAsyncGenerator provides a lightweight alternative that maintains only the most recent value. This implementation is ideal for UI updates, status notifications, or any use case where intermediate values can be safely discarded in favor of the latest state.

The simple generator uses a single-value buffer that gets overwritten with each new emission, ensuring constant memory usage regardless of production rate. When the consumer is slower than the producer, intermediate values are automatically dropped, preventing memory buildup while maintaining responsiveness to the most current data.

async function* newAsyncGenerator<T, E = Error>(
  listen: (
    next: (value: T) => Promise<boolean>,
    done: (error?: E) => Promise<boolean>,
  ) =>
    | void
    | (() => void | Promise<void>),
): AsyncGenerator<T> {
  let done = false;
  let value: T = undefined as T;
  let error: E | undefined;
  let resolve: () => void = () => {};
  let promise = new Promise<void>((y) => {
    resolve = y;
  });
  const unsubscribe = listen(
    async (val: T) => {
      value = val;
      resolve();
      return true;
    },
    async (err?: E) => {
      done = true;
      error = err;
      resolve();
      return true;
    },
  );
  try {
    while (!done) {
      await promise;
      if (done) {
        if (error) throw error;
        break;
      }
      yield value as T;
      promise = new Promise<void>((y) => {
        resolve = y;
      });
    }
  } finally {
    await unsubscribe?.();
  }
}

This implementation offers several advantages for specific use cases:

Memory Efficiency: Uses only a single variable to store the current value, making it suitable for high-frequency data streams where memory consumption is a concern.

Automatic Backpressure Handling: By design, it drops intermediate values when the consumer cannot keep up, preventing queue buildup without requiring explicit backpressure management.

Simplified API: The listen function only requires a simple callback without needing to handle Promise-based return values or completion signals, making integration with existing callback-based APIs straightforward.

Immediate Response: Always yields the most recent value, ensuring that consumers receive the latest state even when processing is delayed.

The simple generator is particularly well-suited for reactive UI components, real-time status displays, or any scenario where only the current state matters and historical values can be safely discarded.

Full Implementation

The full implementation preserves all notifications and provides backpressure management:

/**
 * The newAsyncGenerator function creates async generators from callback-based initialization
 * functions, providing a bridge between imperative event handling and declarative async
 * iteration patterns.
 *
 * The initialization function receives two callback functions that control the generator's
 * behavior. The `next` function yields values to consumers, while the `done` function
 * signals completion or error conditions. The initialization function can return a cleanup
 * function that will be called when the generator is closed or an error occurs.
 *
 * The generator manages an internal queue of values and completion signals, ensuring that
 * producers can yield values without overwhelming consumers. Proper backpressure is implemented
 * by having the `next` and `done` functions return promises that resolve only after the
 * consumer has processed the values. This prevents memory leaks and ensures that producers
 * are aware of whether their values were successfully handled.
 * The generator also handles cleanup by draining any remaining items in the queue and notifying
 * producers that their values were not processed if the generator is closed early. This ensures
 * that resources are properly managed and that no memory leaks occur.
 *
 * Example usage:
 * ```typescript
 * const asyncGen = newAsyncGenerator<number>((next, done) => {
 *   let count = 0;
 *   const interval = setInterval(() => {
 *     if (count < 5) {
 *       next(count++);
 *     } else {
 *       done();
 *       clearInterval(interval);
 *     }
 *   }, 1000);
 *   return () => clearInterval(interval); // Cleanup function
 * });
 * (async () => {
 *   for await (const num of asyncGen) {
 *     console.log(num); // Logs numbers 0 to 4 at 1 second intervals
 *   }
 *   console.log("Completed");
 * })();
 * ```
 * @template T The type of values yielded by the generator
 * @template E The type of errors that can be thrown; defaults to Error
 * @param init Initialization function that sets up the generator behavior with next/done callbacks;
 *  The first parameter - `next` function - is used to yield values to consumers;
 *  It returns a Promise<boolean> indicating whether the value was successfully handled;
 *  The second parameter - `done` function - is used to signal completion or error;
 *  It returns a Promise<boolean> indicating whether the completion was successfully handled;
 *  The initialization function can optionally return a cleanup function that will be called
 *  when the generator is closed or an error occurs;
 * @param skipValues If true, only the most recent value is kept in the queue,
 * skipping intermediate values (not consumed values are considered skipped);
 * This is useful for scenarios where only the latest value matters, such as UI updates.
 * Defaults to false, meaning all values are queued and processed in order.
 * @returns AsyncGenerator that properly manages backpressure and resource cleanup
 */
export async function* newAsyncGenerator<T, E = Error>(
  /**
   * Initialization function that sets up the async generator behavior.
   * @param next - Function to yield values to consumers. Returns a Promise<boolean>
   *               indicating whether the value was successfully handled.
   * @param done - Function to signal completion or error. Optional error parameter
   *               will cause the generator to throw that error.
   * @returns Optional cleanup function that will be called when the generator terminates.
   */
  init: (
    next: (value: T) => Promise<boolean>,
    done: (err?: E) => Promise<boolean>,
  ) => void | (() => void | Promise<void>),

  /**
   * Skipping queue implementation that maintains only the most recent value.
   */
  skipValues = false,
): AsyncGenerator<T> {
  /**
   * Internal queue slot type that wraps values and completion signals with resolution callbacks.
   * This enables the async generator to communicate back to producers whether their values
   * were successfully processed, enabling backpressure management.
   */
  type IterationSlot<T, E> =
    | { done: false; value: T } // Regular value slot
    | { done: true; error?: E }; // Completion/error slot
  type QueueSlot<T, E> = IterationSlot<T, E> & {
    next: QueueSlot<T, E> | undefined; // Pointer to the next slot in the queue
    resolve: (handled: boolean) => void; // Callback to signal if the value was handled
  };

  let head: QueueSlot<T, E> | undefined; // Head of the queue
  let tail: QueueSlot<T, E> | undefined; // Tail of the queue

  /** Flag to prevent new values from being queued after generator closes */
  let closed = false;

  /** Wake-up function to notify the generator loop of new items */
  let wakeUp: undefined | (() => void);

  /**
   * Drains the internal queue, notifying all pending producers that their values
   * were not processed due to generator closure. This prevents memory leaks and
   * ensures proper backpressure signaling.
   */
  const drainQueue = () => {
    for (; head; head = head.next) {
      closed = closed || head.done;
      head.resolve(false); // Notify producer that value was not handled
    }
    tail = undefined;
  };

  /**
   * Enqueues a value or completion signal with a promise that resolves when the item
   * is processed. This enables backpressure by allowing producers to know when their
   * values have been consumed.
   */
  const enqueue = (params: IterationSlot<T, E>): Promise<boolean> => {
    if (skipValues) {
      // Remove any previous value slots
      drainQueue();
    }
    return !closed
      ? new Promise<boolean>((resolve) => {
          // Add the new slot to the queue
          const next = { ...params, next: undefined, resolve };
          if (tail) {
            tail.next = next;
          }
          tail = next;
          if (!head) {
            head = tail;
          }
          // Notify the generator loop that a new item is available
          wakeUp?.();
        })
      : // If the generator is closed, immediately resolve as not handled
        Promise.resolve(false);
  };

  /**
   * Producer function to yield a value to consumers. Returns a promise that resolves
   * to true if the value was successfully processed, false if the generator is closed
   * or the value was skipped due to backpressure.
   */
  const next = (value: T): Promise<boolean> => enqueue({ done: false, value });

  /**
   * Producer function to signal completion or error. Optional error parameter will
   * cause the generator to throw that error to consumers.
   */
  const done = (error?: E): Promise<boolean> => enqueue({ done: true, error });

  // Initialize the producer by calling the init function with our control functions
  const unsubscribe = init(next, done);

  try {
    // Main async generator loop - processes queued items and yields values
    while (!closed) {
      // Try to get the next item from the queue
      if (!head) {
        // If no items available, wait for producers to add something
        await new Promise<void>((resolve) => {
          wakeUp = resolve;
        }).then(() => {
          wakeUp = undefined;
        });
        continue;
      }

      // Process the next item in the queue
      const slot = head;
      // Move head to the next item
      head = head.next;
      // If we removed the tail, clear it as well
      if (tail === slot) {
        tail = head;
      }
      try {
        // Handle completion/error signals
        if (slot.done) {
          closed = true;
          if (slot.error !== undefined) {
            throw slot.error;
          }
          break;
        }
        // Yield the value to the consumer
        yield slot.value;
      } finally {
        // Signal successful processing
        slot.resolve(true);
      }
    }
  } finally {
    /**
     * Cleanup phase - ensures proper resource management and notification of any
     * remaining producers. This runs whether the generator completes normally,
     * encounters an error, or is closed early by the consumer.
     */
    closed = true;

    // Wake up any pending operations to allow them to exit
    wakeUp?.();

    // Call the cleanup function returned by the initialization function
    if (typeof unsubscribe === "function") {
      await unsubscribe();
    }

    /**
     * Drain any remaining items in the queue and notify their producers that
     * the values were not processed. This prevents memory leaks and ensures
     * proper backpressure signaling.
     */
    drainQueue();
  }
}

Main features of this implementation: