Streaming with events()
Problem
You need to consume a continuous stream of events as an async iterable — processing each one in sequence with for await instead of registering a callback that runs in parallel.
Solution
Process an ongoing sequence of events as an async generator:
ts
async function watchCart() {
const controller = new AbortController();
// Stop streaming after 30 seconds of inactivity (extend pattern)
let timeout = setTimeout(() => controller.abort(), 30_000);
for await (const { items, total } of appBus.events('cart:updated', { signal: controller.signal, maxBuffer: 20 })) {
clearTimeout(timeout);
timeout = setTimeout(() => controller.abort(), 30_000);
renderCart(items, total);
}
}Pitfalls
events()subscribes eagerly — whenevents()is called, not when iteration begins. Events emitted before the firstawaitare buffered and will be yielded on the first iteration. There is no data loss betweenevents()and the first iteration step.- Breaking out of the
for awaitloop withbreakorreturndoes not dispose the bus. Useawait usingaround the stream or anAbortSignalfor guaranteed cleanup. - If the bus is disposed while the generator is awaiting the next event, the generator returns cleanly — no exception is thrown. The
for awaitloop simply exits. There is no need to wrap it intry/catchfor disposal.