Why Flux?
Callback chains and Promise-only pipelines break down when you need multi-value, cancellable, composable data flows. Flux gives you a small, typesafe stream primitive with a complete operator library — no heavyweight runtime, no magic.
ts
// Before — nested callbacks, no cancellation
function search(query: string, cb: (results: string[]) => void) {
const id = setTimeout(() => fetchResults(query).then(cb), 300);
return () => clearTimeout(id); // manual cleanup
}
// After — composable, self-cleaning pipeline
import { flux, fromEvent, debounce, switchMap, from } from '@vielzeug/flux';
const results$ = fromEvent<InputEvent>(input, 'input').pipe(
debounce(300),
switchMap((e) => from(fetchResults((e.target as HTMLInputElement).value))),
);
const unsub = results$.subscribe(renderResults);
// unsub() cancels everything, including in-flight fetches| Feature | Flux | RxJS | Observable (TC39) |
|---|---|---|---|
| Bundle size | 3.6 KB | ~50 KB | Native (no bundle) |
| Zero dependencies | |||
| Cold by default | |||
| Disposable (not just teardown) | |||
| Ripple signal adapters | |||
| Operator library |
Use Flux when you need multi-value composable pipelines with cancellation, especially in a Vielzeug project that already uses Ripple, Herald, or Pulse.
Consider RxJS when you need the full RxJS operator catalogue, rely on RxJS-aware third-party libraries, or are migrating an existing codebase.
Installation
sh
pnpm add @vielzeug/fluxsh
npm install @vielzeug/fluxsh
yarn add @vielzeug/fluxQuick Start
ts
import { flux, map, take, toArray } from '@vielzeug/flux';
// Create a cold stream — producer runs per subscriber
const integers$ = flux<number>((observer) => {
let i = 0;
const id = setInterval(() => observer.next(i++), 100);
return () => clearInterval(id); // cleanup on unsubscribe
});
// Compose operators via .pipe()
const first5$ = integers$.pipe(
map((n) => n * 2),
take(5),
);
// Collect to an array (returns a Promise)
const result = await toArray(first5$);
console.log(result); // [0, 2, 4, 6, 8]
// Or subscribe directly
const unsub = integers$.pipe(take(3)).subscribe({
next(v) { console.log(v); },
complete() { console.log('done'); },
error(err) { console.error(err); },
});
// unsub() to cancel earlyFeatures
flux()— Cold stream factory; producer runs once per subscribercreateSubject()— Hot multicasting subject withemit()/complete()/error()createBehaviorSubject()— Subject that replays the latest value to new subscribers- 40+ operators — Creation, transformation, filtering, combination, utility
pipe()— Chainable, type-safe operator compositiondispose()— First-class lifecycle; shuts down the stream and all subscriptionsAbortSignalsupport —takeUntil(signal)integrates with standard cancellation- Ripple adapters —
fromSignal()/toSignal()bridge signals and streams - Herald adapters —
fromBus()/toBus()bridge typed event buses - Pulse adapters —
fromPulse()/fromPresence()for real-time channels - Courier adapters —
fromSse()/fromReadable()/fromQuery()for HTTP sources toPromise()/toArray()— Collect stream output into standard async primitives
Documentation
See Also
- Ripple — Reactive signals and effects;
fromSignal()/toSignal()connect Flux streams to Ripple's signal graph - Herald — Typed event bus;
fromBus()/toBus()wrap bus events as Flux streams - Pulse — Real-time WebSocket channels;
fromPulse()/fromPresence()expose channel data as streams - Courier — HTTP client;
fromSse()/fromReadable()/fromQuery()wrap Courier sources as streams