Event-Based Processing & Idempotency¶
SynaFlow treats lazy streaming as the default. This isn't just about memory efficiency — it makes the framework naturally suited for event-based processing and guarantees idempotent pipelines.
Individual events or time windows — the same pipeline¶
Because evaluation is lazy, you can process events one at a time (streaming) or batched in a time window (materialized) without changing your business logic.
from collections.abc import Generator, Iterator
from synaflow import pipeline, step, run
class Window(NamedTuple):
events: list[dict] = [{"id": 1, "amount": 10}, {"id": 2, "amount": 20}]
# Option 1: process each event individually (EACH mode)
def normalize(event: dict) -> dict:
return {**event, "amount": event["amount"] / 100}
def log_each(normalize: Iterator[dict]) -> None:
for ev in normalize:
print(f"Event: {ev}")
# Option 2: process the entire window at once (ALL mode)
def aggregate(normalize: list[dict]) -> float:
return sum(e["amount"] for e in normalize)
p = pipeline(
name="event_processor",
params=Window,
steps=[
step("normalize", fn=normalize), # EACH: one event at a time
step("log_each", fn=log_each), # ALL: receives lazy stream
step("aggregate", fn=aggregate), # ALL: materializes the window
],
)
run(p, Window(events=[{"id": 1, "amount": 10}, {"id": 2, "amount": 20}]))
from collections.abc import AsyncGenerator, AsyncIterator
from synaflow import pipeline, step, async_run
class Window(NamedTuple):
events: list[dict] = [{"id": 1, "amount": 10}, {"id": 2, "amount": 20}]
async def normalize(event: dict) -> dict:
return {**event, "amount": event["amount"] / 100}
async def log_each(normalize: AsyncIterator[dict]) -> None:
async for ev in normalize:
print(f"Event: {ev}")
async def aggregate(normalize: list[dict]) -> float:
return sum(e["amount"] for e in normalize)
p = pipeline(
name="event_processor",
params=Window,
steps=[
step("normalize", fn=normalize),
step("log_each", fn=log_each),
step("aggregate", fn=aggregate),
],
)
async_run(p, Window(events=[{"id": 1, "amount": 10}, {"id": 2, "amount": 20}]))
The same normalize step serves both consumers — log_each streams individual
events, aggregate materializes the full window. No code duplication.
Idempotency by default¶
A pipeline is idempotent if running it twice with the same input produces the same output with the same side effects. SynaFlow's lazy design makes this natural:
- No shared mutable state between steps — each step is a pure function of its declared dependencies.
- Deterministic DAG — the same params always produce the same execution plan. The DAG JSON is a verifiable, serializable contract.
- Error tolerance — with
OnError.CONTINUE(the default), a failing event is discarded without halting the stream. The remaining events continue unchanged.
from synaflow import pipeline, step, run, OnError
def process(event: dict) -> dict:
if event.get("corrupt"):
raise ValueError("skipping corrupt event")
return {**event, "processed": True}
def sink(process: Iterator[dict]) -> None:
for ev in process:
print(f"Stored: {ev}")
p = pipeline(
name="idempotent",
params=type("P", (NamedTuple,), {}),
steps=[
step("process", fn=process, on_error=OnError.CONTINUE),
step("sink", fn=sink),
],
)
from synaflow import pipeline, step, async_run, OnError
async def process(event: dict) -> dict:
if event.get("corrupt"):
raise ValueError("skipping corrupt event")
return {**event, "processed": True}
async def sink(process: AsyncIterator[dict]) -> None:
async for ev in process:
print(f"Stored: {ev}")
p = pipeline(
name="idempotent",
params=type("P", (NamedTuple,), {}),
steps=[
step("process", fn=process, on_error=OnError.CONTINUE),
step("sink", fn=sink),
],
)
Re-run with the same events and you get the same results. The corrupt one is silently discarded both times.
Re-processing past events¶
When you need to re-process historical events (audit, backfill, bug fix), you can:
- Individually: push each past event through as a stream, using the same EACH-mode pipeline. No batch overhead.
- As a window: collect them into a time window, materialize, and process with ALL-mode steps. Better for aggregations and analytics.
- Mixed: reprocess individually but also materialize a summary for audit.
# Re-process a week of events individually
for event in fetch_events(start="2026-06-01", end="2026-06-07"):
run(p, Params(events=[event]))
# Same week, but aggregate the daily windows
for day in range(1, 8):
batch = fetch_events(start=f"2026-06-{day:02d}")
run(p, Params(events=batch))
The pipeline definition doesn't change — only the input.
Lazy default preserves CPU and RAM¶
Because SynaFlow defaults to lazy streaming (OnError.CONTINUE), a failing
event is discarded immediately without loading the full batch into memory.
This is critical for high-throughput event systems where you can't afford to
halt the pipeline or materialize everything just to handle one failure.
flowchart LR
E[Events] --> N[normalize<br/>EACH]
N -->|lazy| L[log_each<br/>ALL]
N -->|materialized| A[aggregate<br/>ALL]
L --> S[Sink]
A --> S
The same producer feeds two consumers with different memory profiles — one lazy, one materialized. Idempotent, efficient, and zero code duplication.