Streaming with events()
Problem
You need to consume a continuous stream of events as an async iterable — processing each one in sequence with for await instead of registering a callback that runs in parallel.
Solution
Process an ongoing sequence of events as an async generator:
ts
async function watchCart() {
const controller = new AbortController();
// Stop streaming after 30 seconds of inactivity (extend pattern)
let timeout = setTimeout(() => controller.abort(), 30_000);
for await (const { items, total } of appBus.events('cart:updated', { signal: controller.signal, maxBuffer: 20 })) {
clearTimeout(timeout);
timeout = setTimeout(() => controller.abort(), 30_000);
renderCart(items, total);
}
}Pitfalls
events()starts listening when iteration begins (the firstnext()call), not when the generator object is created. Events emitted between construction and first iteration are lost.- Breaking out of the
for awaitloop withbreakorreturndoes not dispose the bus. Callbus.dispose()explicitly or wrap the loop intry/finally. - If the bus is disposed while the generator is awaiting the next event, the generator throws
BusDisposedError. Wrap thefor awaitin a try/catch to handle graceful teardown.