Lockstep Data Flow¶
SynaFlow's streaming engine guarantees extreme memory efficiency by processing pipelines in lockstep — one item flows entirely through the DAG before the next item is produced.
This is the default behavior because every step starts with
max_in_flight=1. If you need a bounded window between two stages, see
Max In Flight, which includes real sync and async HTTP
examples for I/O-bound pipelines.
A Streaming Pipeline¶
from collections.abc import Generator, Iterator
from typing import NamedTuple
from synaflow import pipeline, step, run
class Params(NamedTuple):
count: int = 3
def numbers(count: int) -> Generator[int, None, None]:
yield from range(count)
def doubler(number: int) -> int:
return number * 2
def printer(doubler: Iterator[int]) -> None:
for x in doubler:
print(f"Consumed: {x}")
p = pipeline(
name="lockstep_demo",
params=Params,
steps=[
step("numbers", fn=numbers),
step("doubler", fn=doubler),
step("printer", fn=printer),
],
)
run(p, Params(count=5))
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple
from synaflow import pipeline, step, async_run
class Params(NamedTuple):
count: int = 3
async def numbers(count: int) -> AsyncGenerator[int, None]:
for i in range(count):
yield i
async def doubler(number: int) -> int:
return number * 2
async def printer(doubler: AsyncIterator[int]) -> None:
async for x in doubler:
print(f"Consumed: {x}")
p = pipeline(
name="lockstep_demo",
params=Params,
steps=[
step("numbers", fn=numbers),
step("doubler", fn=doubler),
step("printer", fn=printer),
],
)
async_run(p, Params(count=5))
The DAG¶
SynaFlow reads the type hints and builds this graph:
flowchart TD
numbers["numbers<br/><i>Stream[int]</i>"]
doubler["doubler<br/><i>ListType(<class 'int'>)</i>"]
printer["printer<br/><i>None</i>"]
count --> numbers
numbers --> doubler
doubler --> printer
Three steps, three execution levels: numbers → doubler → printer.
How Lockstep Execution Works¶
The pipeline processes one item at a time from start to finish. But here's the key insight: steps don't wait for each other — they run concurrently like an assembly line.
| numbers | doubler | printer |
|---|
The animation above shows count=5. Three views of the same execution:
- Top: the DAG — watch items light up inside each node as they're processed.
- Middle: the timeline table — one row per moment.
- Bottom: insight text explaining what's happening.
Key observation: at frame 3, numbers is working on item 2 while doubler
processes item 1 and printer prints item 0. Three items in flight,
one per step — that's lockstep streaming.
Fan-Out: Multiple Consumers¶
When multiple consumers depend on the same producer, SynaFlow automatically forks
the stream with itertools.tee and advances them together:
flowchart TD
gen["gen<br/><i>Stream[int]</i>"]
lazy["lazy<br/><i>Stream[int]</i>"]
eager["eager<br/><i>list[int]</i>"]
count --> gen
gen --> lazy
gen --> eager
lazy_consumerreceives a lazy fork — streams without holding data.eager_consumerasks forlist[int]— SynaFlow materializes only that fork.
Both consumers receive every item. The lazy fork never holds the full dataset; only the eager fork does.
Execution Levels¶
SynaFlow topologically sorts the DAG into levels. Steps on the same level can run in parallel (in an async runner):
For a diamond topology, independent branches share a level:
Diamond DAG in Action¶
Here's a more complex example: one producer feeds two branches, and a final step
joins both streams. Watch how SynaFlow advances doubler and tripler in
lockstep, pads the shorter stream with None, and feeds the pairs to join.
| numbers | doubler | tripler | join (pairs) |
|---|
The diamond animation shows a key behavior: when doubler and tripler produce
items at the same time, SynaFlow unrolls both streams together — yielding
pairs like (2, 3), (4, 6) — before moving to the next item. If one stream
were shorter, SynaFlow would pad with None. This is the same lockstep
mechanism, applied to fan-in.
When Materialization Happens¶
| Consumer expects | Behavior |
|---|---|
Iterator[T] |
Lazy stream — one item in memory |
list[T] |
Full materialization in memory |
dict[K,V] |
Materialized from Iterator[tuple[K,V]] |
set[T] |
Full materialization in memory |
Materialization is per-branch — a lazy consumer and an eager consumer coexist without forcing each other.
Next¶
Start building in the Hello World tutorial.