Combining Streams with combineLatest
Problem
A UI element depends on two independent data sources — for example, a filter panel and a paginated list. You need to re-render whenever either changes, using the latest value from both.
Solution
Use combineLatest() to emit a tuple whenever any source emits, provided all sources have emitted at least once. Pair with map() to derive the final state.
ts
import { createSubject, combineLatest, map } from '@vielzeug/flux';
type Filter = { category: string };
type Page = { page: number; size: number };
type ListState = { filter: Filter; page: Page };
const filter$ = createSubject<Filter>();
const page$ = createSubject<Page>();
const listState$ = combineLatest(filter$, page$).pipe(
map(([filter, page]): ListState => ({ filter, page })),
);
listState$.subscribe(({ filter, page }) => {
console.log(`Fetching page ${page.page} with category=${filter.category}`);
});
filter$.emit({ category: 'books' });
page$.emit({ page: 1, size: 20 });
// → Fetching page 1 with category=books
page$.emit({ page: 2, size: 20 });
// → Fetching page 2 with category=booksOnly re-fetch when the source changes — not on every render
Use distinctUntilChanged() with a shallow comparator to skip redundant fetches:
ts
import { distinctUntilChanged } from '@vielzeug/flux';
const stableState$ = listState$.pipe(
distinctUntilChanged(
(a, b) => a.filter.category === b.filter.category && a.page.page === b.page.page,
),
);Sampling — take the latest filter only when the page changes
If the filter changes constantly (e.g., a range slider) but you only want to act when the page commits:
ts
import { sample, withLatestFrom } from '@vielzeug/flux';
// Emit [page, filter] only when page$ emits
page$.pipe(withLatestFrom(filter$)).subscribe(([page, filter]) => {
console.log(`Commit: page=${page.page}, category=${filter.category}`);
});Pitfalls
combineLatestdoes not emit until all sources have emitted at least once — seed subjects with an initial value or usestartWith()if you need an immediate emission.- For fire-and-forget coordination (wait for all to complete, emit last values) use
forkJoin()instead. withLatestFromdoes not subscribe to theotherstream; it only reads its latest buffered value. Ifotherhas never emitted, the emission is silently dropped.