Reactive Tables
Problem
You need UI components or effects to re-run whenever a table's contents change. You want to subscribe to one or multiple tables and receive a fresh snapshot on every mutation, without polling or manual wiring.
Solution
Use db.observe(table, fn) for callback-based subscriptions, db.watch(table) for for await loops, db.watchStream(table) for ReadableStream pipelines, and db.observeMany(tables, fn) for combined multi-table snapshots. For deep integration with a reactive library, pass a signals map at construction time.
observe — callback subscription
observe always fires immediately with the current table state on registration, then fires again on every mutation. There is no deferred-first-call mode.
import { createMemory, table, type Unsubscribe } from '@vielzeug/vault';
type User = { id: number; name: string };
const schema = { users: table<User>('id') };
const db = createMemory({ schema });
// Fires immediately with current state, then on every change
const stop: Unsubscribe = db.observe('users', (rows) => {
console.log('users snapshot:', rows.length);
});
await db.put('users', { id: 1, name: 'Alice' }); // triggers the callback again
stop();Pass { signal } to cancel via an AbortController:
const controller = new AbortController();
db.observe('users', (rows) => render(rows), { signal: controller.signal });
await db.put('users', { id: 1, name: 'Alice' }); // triggers callback
controller.abort(); // stops the observerwatch — async iterable stream
watch always starts with an immediate snapshot and then yields one snapshot per change. The observer is cleaned up automatically when the loop exits via break, return, or an unhandled throw.
for await (const users of db.watch('users')) {
renderList(users);
if (shouldStop) break; // unsubscribes automatically
}Stop the loop from outside using an AbortSignal:
const controller = new AbortController();
for await (const users of db.watch('users', { signal: controller.signal })) {
renderList(users);
}
controller.abort(); // terminates the loopBy default (mode: 'latest') intermediate snapshots are dropped if the consumer lags. Pass mode: 'all' to queue every snapshot:
for await (const users of db.watch('users', { mode: 'all' })) {
// receives every snapshot, even if mutations were rapid
}watchStream — ReadableStream
watchStream returns a Web Standard ReadableStream. Use it with WHATWG stream pipelines or any consumer that accepts a ReadableStream.
db.watchStream('users').pipeTo(new WritableStream({ write: (users) => renderList(users) }));Always cancel the stream when done to stop the underlying observer:
const reader = db.watchStream('users').getReader();
const { value } = await reader.read(); // receives immediate snapshot
renderList(value);
await reader.cancel(); // unsubscribes the observerThe same mode and signal options as watch() apply.
observeMany — combined multi-table snapshot
observeMany fires a combined { [tableName]: records[] } snapshot whenever any of the observed tables changes. All per-table observers fire immediately on registration. Multiple tables changed inside a single batch() call coalesce into one callback.
type Post = { id: number; title: string; userId: number };
const dbSchema = {
users: table<User>('id'),
posts: table<Post>('id'),
};
const db = createMemory({ schema: dbSchema });
const stop = db.observeMany(['users', 'posts'], ({ users, posts }) => {
console.log(`${users.length} users, ${posts.length} posts`);
});
// Both tables change — the listener fires exactly once after the batch
await db.batch(['users', 'posts'], async (tx) => {
await tx.put('users', { id: 1, name: 'Alice' });
await tx.put('posts', { id: 10, title: 'Hello', userId: 1 });
});
stop();signals plugin — zero-boilerplate reactivity
Wire a @vielzeug/ripple signal at construction time. Vault keeps it in sync automatically — no observe() call needed.
import { signal, effect } from '@vielzeug/ripple';
import { createIndexedDB, table } from '@vielzeug/vault';
type User = { id: number; name: string };
const schema = { users: table<User>('id') };
const usersSignal = signal<User[]>([]);
const db = createIndexedDB({
name: 'app',
schema,
signals: { users: usersSignal },
version: 1,
});
effect(() => console.log('users:', usersSignal.value.length));
await db.put('users', { id: 1, name: 'Alice' }); // → effect re-runsPitfalls
observe()always fires immediately on registration — there is no deferred-first-call mode. If you need to skip the initial state, check a flag in your callback on the first invocation.observe()returns anUnsubscribefunction — forgetting to call it on teardown leaks listeners. Usewatch()orwatchStream()for loops where cleanup needs to be automatic.observeMany()throwsVaultScopeErrorwhen passed an emptytablesarray. Always provide at least one table name.- Writes to multiple tables inside a single
batch()call triggerobserveManyexactly once, not once per dirty table. Writes outside abatch()trigger separate callbacks per table. - For
watch()andwatchStream(), the defaultmode: 'latest'silently drops intermediate snapshots when the consumer lags. Usemode: 'all'if every intermediate state matters. - The
signalsplugin callssignal.update(() => snapshot)synchronously inside the observer. If the signal triggers further writes to the same table (e.g., via a computed effect), this can cause a cycle. Structure your data flow so signals are downstream-only from storage.