Basic Usage
ts
import { flux, map, take } from '@vielzeug/flux';
const double$ = flux<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete?.();
}).pipe(
map((n) => n * 2),
take(2),
);
double$.subscribe({
next(v) { console.log(v); }, // 2, 4
complete() { console.log('done'); },
});Creating Streams
From static values
ts
import { of, from, empty, never, throwError } from '@vielzeug/flux';
of(1, 2, 3).subscribe(console.log); // 1 2 3
from([10, 20]).subscribe(console.log); // 10 20
from(Promise.resolve('hello')).subscribe(console.log); // hello
empty().subscribe({ complete: () => console.log('done') });From time
ts
import { interval, timer } from '@vielzeug/flux';
// Emits 0, 1, 2, ... every 500ms
const tick$ = interval(500);
// Emits 0 after 1 second, then completes
const oneShot$ = timer(1000);From DOM events
ts
import { fromEvent, debounce } from '@vielzeug/flux';
const clicks$ = fromEvent<MouseEvent>(document, 'click');
const keyups$ = fromEvent<KeyboardEvent>(input, 'keyup').pipe(debounce(300));Subjects
A Subject is a hot stream — it multicasts to all current subscribers. Use it as the imperative entry point into a pipeline.
ts
import { createSubject, createBehaviorSubject } from '@vielzeug/flux';
const events$ = createSubject<string>();
events$.subscribe((v) => console.log('A:', v));
events$.subscribe((v) => console.log('B:', v));
events$.emit('hello'); // A: hello B: hello
events$.complete(); // ends all subscriptionscreateBehaviorSubject replays the latest value to every new subscriber:
ts
const count$ = createBehaviorSubject(0);
count$.emit(1);
count$.emit(2);
// Late subscriber immediately receives 2
count$.subscribe((v) => console.log(v)); // 2Subscribing
subscribe accepts either a function or an observer object:
ts
// Function shorthand — only receives values
const unsub = source$.subscribe((v) => console.log(v));
// Full observer
const unsub = source$.subscribe({
next(v) { /* ... */ },
complete() { /* ... */ },
error(err) { /* ... */ },
});
// Cancel at any time
unsub();Composing Operators
All operators are used via .pipe():
ts
import { filter, map, take, debounce } from '@vielzeug/flux';
const result$ = source$.pipe(
filter((v) => v > 0),
map((v) => v * 10),
take(5),
);Transformation
ts
import { map, scan, switchMap, flatMap, concatMap, startWith, bufferCount, pairwise } from '@vielzeug/flux';
// Accumulate running total
of(1, 2, 3).pipe(scan((acc, n) => acc + n, 0)); // 1, 3, 6
// Prepend static values
of(3, 4).pipe(startWith(1, 2)); // 1, 2, 3, 4
// Emit pairs of consecutive values
of(1, 2, 3).pipe(pairwise()); // [1,2], [2,3]
// Collect into fixed-size arrays
of(1, 2, 3, 4).pipe(bufferCount(2)); // [1,2], [3,4]
// Cancel previous inner stream on each new outer emission
search$.pipe(switchMap((q) => from(fetch(`/api?q=${q}`))));Filtering
ts
import { take, skip, first, last, takeWhile, takeUntil, debounce, throttle, sample } from '@vielzeug/flux';
source$.pipe(take(3)); // first 3 values
source$.pipe(skip(2)); // skip first 2
source$.pipe(takeWhile((n) => n < 5)); // until predicate is false
source$.pipe(debounce(300)); // last value after 300ms silence
source$.pipe(throttle(200)); // at most one value per 200msCancellation with AbortSignal
ts
import { takeUntil } from '@vielzeug/flux';
import { createSubject } from '@vielzeug/flux';
const ac = new AbortController();
source$.pipe(takeUntil(ac.signal)).subscribe(console.log);
ac.abort(); // stops the subscriptionYou can also pass a Flux as the notifier:
ts
const stop$ = createSubject<void>();
source$.pipe(takeUntil(stop$)).subscribe(console.log);
stop$.emit(); // stops the subscriptionCombination
ts
import { merge, concat, combineLatest, withLatestFrom, zip, forkJoin } from '@vielzeug/flux';
// Merge two streams — emit as values arrive
merge(stream1$, stream2$);
// Sequential — exhaust stream1$ before subscribing to stream2$
concat(stream1$, stream2$);
// Emit tuple when all sources have at least one value
combineLatest(a$, b$);
// Each a$ emission paired with the latest b$ value
a$.pipe(withLatestFrom(b$));
// Pair by index — [a1, b1], [a2, b2]
zip(a$, b$);
// Wait for all to complete; emit tuple of last values
forkJoin(a$, b$);Error Handling
ts
import { catchError, retry, of } from '@vielzeug/flux';
// Recover with a fallback stream
source$.pipe(catchError(() => of('fallback')));
// Retry up to 3 times before propagating
source$.pipe(retry(3));Multicasting
ts
import { share, shareReplay } from '@vielzeug/flux';
// Share one execution among all subscribers (unsubscribes when last subscriber leaves)
const shared$ = expensiveSource$.pipe(share());
// Replay last N values to late subscribers
const replayed$ = source$.pipe(shareReplay(2));Converting to Promises / Arrays
ts
import { toPromise, toArray } from '@vielzeug/flux';
const last = await toPromise(of(1, 2, 3)); // 3
const all = await toArray(of(1, 2, 3)); // [1, 2, 3]Disposal
dispose() terminates a Flux permanently and completes all active subscriptions:
ts
const subject = createSubject<number>();
subject.emit(1);
subject.dispose(); // all subscriptions complete; no further values acceptedFor the flux() factory, each subscription is cancelled by calling the unsubscribe function returned by subscribe(). The stream itself does not need to be explicitly disposed.
Framework Integration
ts
import { useEffect, useState } from 'react';
import type { Flux } from '@vielzeug/flux';
function useFlux<T>(source: Flux<T>, initial: T): T {
const [value, setValue] = useState(initial);
useEffect(() => {
const unsub = source.subscribe((v) => setValue(v));
return unsub; // React calls this on cleanup
}, [source]);
return value;
}ts
import { onUnmounted, ref } from 'vue';
import type { Flux } from '@vielzeug/flux';
function useFlux<T>(source: Flux<T>, initial: T) {
const value = ref<T>(initial);
const unsub = source.subscribe((v) => { value.value = v as T; });
onUnmounted(unsub);
return value;
}ts
import type { Flux } from '@vielzeug/flux';
import type { Readable } from 'svelte/store';
function fromFlux<T>(source: Flux<T>, initial: T): Readable<T> {
return {
subscribe(run) {
run(initial);
return source.subscribe(run);
},
};
}
// Use in template: {$myStore}Working with Other Vielzeug Libraries
Ripple signals
ts
import { fromSignal, toSignal } from '@vielzeug/flux';
import { signal } from '@vielzeug/ripple';
const count = signal(0);
// Ripple signal → Flux stream (emits current value immediately)
const count$ = fromSignal(count);
count$.subscribe(console.log); // 0, then on every change
// Flux stream → Ripple signal
const latest = toSignal(count$, { initial: 0 });
// latest.value stays in sync; latest.dispose() ends trackingHerald event bus
ts
import { fromBus, toBus } from '@vielzeug/flux';
import { createBus } from '@vielzeug/herald';
type Events = { 'user:login': { id: string } };
const bus = createBus<Events>();
// Bus event → Flux stream
fromBus(bus, 'user:login').subscribe(({ id }) => console.log(id));
// Flux stream → bus emissions (values also pass through)
source$.pipe(toBus(bus, 'user:login')).subscribe();Best Practices
- Unsubscribe or dispose when a stream is no longer needed to avoid memory leaks
- Prefer
switchMapoverflatMapfor request–response patterns where only the latest matters - Use
shareReplay(1)when multiple components need the same latest value - Use
createBehaviorSubjectrather thancreateSubjectwhen late subscribers need the current state - Use
takeUntil(signal)with anAbortControllerfor component-scoped stream lifetimes - Use
toArray()ortoPromise()in tests — they wrap the stream in aPromisethat resolves when the stream completes