Skip to content
flux logoFluxReactive
Cold-by-default, disposable reactive streams with a full operator library and ecosystem adapters for Ripple, Herald, Pulse, and Courier.
v0.0.13.6 KB gzip Browser · Node.js · SSR · Deno
fluxcreateSubjectcreateBehaviorSubjectoffromfromEventintervaltimer +52 more →

Why Flux?

Callback chains and Promise-only pipelines break down when you need multi-value, cancellable, composable data flows. Flux gives you a small, typesafe stream primitive with a complete operator library — no heavyweight runtime, no magic.

ts
// Before — nested callbacks, no cancellation
function search(query: string, cb: (results: string[]) => void) {
  const id = setTimeout(() => fetchResults(query).then(cb), 300);
  return () => clearTimeout(id); // manual cleanup
}

// After — composable, self-cleaning pipeline
import { flux, fromEvent, debounce, switchMap, from } from '@vielzeug/flux';

const results$ = fromEvent<InputEvent>(input, 'input').pipe(
  debounce(300),
  switchMap((e) => from(fetchResults((e.target as HTMLInputElement).value))),
);
const unsub = results$.subscribe(renderResults);
// unsub() cancels everything, including in-flight fetches
FeatureFluxRxJSObservable (TC39)
Bundle size3.6 KB~50 KBNative (no bundle)
Zero dependencies
Cold by default
Disposable (not just teardown) Partial
Ripple signal adapters Native
Operator library 40+ operators 100+ WIP

Use Flux when you need multi-value composable pipelines with cancellation, especially in a Vielzeug project that already uses Ripple, Herald, or Pulse.

Consider RxJS when you need the full RxJS operator catalogue, rely on RxJS-aware third-party libraries, or are migrating an existing codebase.

Installation

sh
pnpm add @vielzeug/flux
sh
npm install @vielzeug/flux
sh
yarn add @vielzeug/flux

Quick Start

ts
import { flux, map, take, toArray } from '@vielzeug/flux';

// Create a cold stream — producer runs per subscriber
const integers$ = flux<number>((observer) => {
  let i = 0;
  const id = setInterval(() => observer.next(i++), 100);
  return () => clearInterval(id); // cleanup on unsubscribe
});

// Compose operators via .pipe()
const first5$ = integers$.pipe(
  map((n) => n * 2),
  take(5),
);

// Collect to an array (returns a Promise)
const result = await toArray(first5$);
console.log(result); // [0, 2, 4, 6, 8]

// Or subscribe directly
const unsub = integers$.pipe(take(3)).subscribe({
  next(v) { console.log(v); },
  complete() { console.log('done'); },
  error(err) { console.error(err); },
});
// unsub() to cancel early

Features

  • flux() — Cold stream factory; producer runs once per subscriber
  • createSubject() — Hot multicasting subject with emit() / complete() / error()
  • createBehaviorSubject() — Subject that replays the latest value to new subscribers
  • 40+ operators — Creation, transformation, filtering, combination, utility
  • pipe() — Chainable, type-safe operator composition
  • dispose() — First-class lifecycle; shuts down the stream and all subscriptions
  • AbortSignal support — takeUntil(signal) integrates with standard cancellation
  • Ripple adaptersfromSignal() / toSignal() bridge signals and streams
  • Herald adaptersfromBus() / toBus() bridge typed event buses
  • Pulse adaptersfromPulse() / fromPresence() for real-time channels
  • Courier adaptersfromSse() / fromReadable() / fromQuery() for HTTP sources
  • toPromise() / toArray() — Collect stream output into standard async primitives

Documentation

See Also

  • Ripple — Reactive signals and effects; fromSignal()/toSignal() connect Flux streams to Ripple's signal graph
  • Herald — Typed event bus; fromBus()/toBus() wrap bus events as Flux streams
  • Pulse — Real-time WebSocket channels; fromPulse()/fromPresence() expose channel data as streams
  • Courier — HTTP client; fromSse()/fromReadable()/fromQuery() wrap Courier sources as streams