Streaming (SSE & WebSocket)
A query owns the snapshot; a stream keeps a live view store fresh. Load the initial data with a query, then fold stream events into a plain store seeded from the query's public finished.done. One raw stream message is fanned out into typed events with patronum's splitMap — no query internals, no refetch.
Why a separate store?
The query stays the single owner of fetching; the view store owns the merged live state. This avoids reaching into private seams and keeps the stream logic plain effector you can test and compose.
Server-Sent Events
ts
import { createEffect, createEvent, createStore, sample } from 'effector';
import { splitMap } from 'patronum';
import { createQuery } from 'effector-refetch';
const noticesQuery = createQuery({ effect: fetchNoticesFx });
// raw SSE message -> typed events
const messageReceived = createEvent<Message>();
const { created, deleted } = splitMap({
source: messageReceived,
cases: {
created: (m) => (m.event === 'created' ? m.notice : undefined),
deleted: (m) => (m.event === 'deleted' ? m.id : undefined),
},
});
// view store: seeded from the query, then patched by the stream
const $notices = createStore<Notice[]>([])
.on(created, (list, notice) => [notice, ...list])
.on(deleted, (list, id) => list.filter((n) => n.id !== id));
sample({ clock: noticesQuery.finished.done, fn: ({ result }) => result, target: $notices });
const openStreamFx = createEffect((url: string) => {
const source = new EventSource(url);
source.addEventListener('message', (e) => messageReceived(JSON.parse(e.data)));
return source; // source.close() to stop
});
noticesQuery.start();
openStreamFx('/api/notices/stream');WebSocket
ts
const pricesQuery = createQuery({ effect: fetchPricesFx });
const messageReceived = createEvent<{ type: string; payload: unknown }>();
const { snapshot, tick } = splitMap({
source: messageReceived,
cases: {
snapshot: (m) => (m.type === 'snapshot' ? (m.payload as Prices) : undefined),
tick: (m) => (m.type === 'tick' ? (m.payload as Tick) : undefined),
},
});
const $prices = createStore<Prices>({})
.on(snapshot, (_prices, snap) => snap)
.on(tick, (prices, t) => ({ ...prices, [t.symbol]: t.value }));
sample({ clock: pricesQuery.finished.done, fn: ({ result }) => result, target: $prices });
// open a WebSocket, forward messages to messageReceived, track a $connected store…Because the query is plain effector, the socket/SSE lifecycle is just effects and events — compose reconnect, backoff, or other patronum operators freely.
Runnable: examples/sse.ts, examples/websocket.ts.