Стриминг (SSE и WebSocket)
Запрос владеет снапшотом, а поток поддерживает свежим живой view-store. Загрузите начальные данные запросом, затем сворачивайте события потока в обычный стор, засеянный из публичного finished.done запроса. Одно «сырое» сообщение потока раскладывается на типизированные события через splitMap из patronum — без внутренних швов запроса и без рефетча.
Зачем отдельный стор?
Запрос остаётся единственным владельцем загрузки; view-store владеет смёрженным живым состоянием. Так мы не лезем в приватные швы, а логика потока — обычный effector, который легко тестировать и композировать.
Server-Sent Events
import { createEffect, createEvent, createStore, sample } from 'effector';
import { splitMap } from 'patronum';
import { createQuery } from 'effector-refetch';
const noticesQuery = createQuery({ effect: fetchNoticesFx });
// сырое SSE-сообщение -> типизированные события
const messageReceived = createEvent<Message>();
const { created, deleted } = splitMap({
source: messageReceived,
cases: {
created: (m) => (m.event === 'created' ? m.notice : undefined),
deleted: (m) => (m.event === 'deleted' ? m.id : undefined),
},
});
// view-store: засеян из запроса, затем патчится потоком
const $notices = createStore<Notice[]>([])
.on(created, (list, notice) => [notice, ...list])
.on(deleted, (list, id) => list.filter((n) => n.id !== id));
sample({ clock: noticesQuery.finished.done, fn: ({ result }) => result, target: $notices });
const openStreamFx = createEffect((url: string) => {
const source = new EventSource(url);
source.addEventListener('message', (e) => messageReceived(JSON.parse(e.data)));
return source; // source.close() чтобы остановить
});
noticesQuery.start();
openStreamFx('/api/notices/stream');WebSocket
const pricesQuery = createQuery({ effect: fetchPricesFx });
const messageReceived = createEvent<{ type: string; payload: unknown }>();
const { snapshot, tick } = splitMap({
source: messageReceived,
cases: {
snapshot: (m) => (m.type === 'snapshot' ? (m.payload as Prices) : undefined),
tick: (m) => (m.type === 'tick' ? (m.payload as Tick) : undefined),
},
});
const $prices = createStore<Prices>({})
.on(snapshot, (_prices, snap) => snap)
.on(tick, (prices, t) => ({ ...prices, [t.symbol]: t.value }));
sample({ clock: pricesQuery.finished.done, fn: ({ result }) => result, target: $prices });
// открываем WebSocket, форвардим сообщения в messageReceived, держим стор $connected…Поскольку запрос — обычный effector, жизненный цикл сокета/SSE — это просто эффекты и события: свободно собирайте reconnect, backoff или другие операторы patronum.
Рабочие примеры: examples/sse.ts, examples/websocket.ts.