Skip to content

Lockstep Data Flow

SynaFlow's streaming engine guarantees extreme memory efficiency by processing pipelines in lockstep — one item flows entirely through the DAG before the next item is produced.

This is the default behavior because every step starts with max_in_flight=1. If you need a bounded window between two stages, see Max In Flight, which includes real sync and async HTTP examples for I/O-bound pipelines.

A Streaming Pipeline

from collections.abc import Generator, Iterator
from typing import NamedTuple
from synaflow import pipeline, step, run

class Params(NamedTuple):
    count: int = 3

def numbers(count: int) -> Generator[int, None, None]:
    yield from range(count)

def doubler(number: int) -> int:
    return number * 2

def printer(doubler: Iterator[int]) -> None:
    for x in doubler:
        print(f"Consumed: {x}")

p = pipeline(
    name="lockstep_demo",
    params=Params,
    steps=[
        step("numbers", fn=numbers),
        step("doubler", fn=doubler),
        step("printer", fn=printer),
    ],
)

run(p, Params(count=5))
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple
from synaflow import pipeline, step, async_run

class Params(NamedTuple):
    count: int = 3

async def numbers(count: int) -> AsyncGenerator[int, None]:
    for i in range(count):
        yield i

async def doubler(number: int) -> int:
    return number * 2

async def printer(doubler: AsyncIterator[int]) -> None:
    async for x in doubler:
        print(f"Consumed: {x}")

p = pipeline(
    name="lockstep_demo",
    params=Params,
    steps=[
        step("numbers", fn=numbers),
        step("doubler", fn=doubler),
        step("printer", fn=printer),
    ],
)

async_run(p, Params(count=5))

The DAG

SynaFlow reads the type hints and builds this graph:

flowchart TD
    numbers["numbers<br/><i>Stream[int]</i>"]
    doubler["doubler<br/><i>ListType(<class 'int'>)</i>"]
    printer["printer<br/><i>None</i>"]
    count --> numbers
    numbers --> doubler
    doubler --> printer

Three steps, three execution levels: numbersdoublerprinter.

How Lockstep Execution Works

The pipeline processes one item at a time from start to finish. But here's the key insight: steps don't wait for each other — they run concurrently like an assembly line.

Step: 0/9
numbers doubler printer
numbers doubler printer

The animation above shows count=5. Three views of the same execution:

  • Top: the DAG — watch items light up inside each node as they're processed.
  • Middle: the timeline table — one row per moment.
  • Bottom: insight text explaining what's happening.

Key observation: at frame 3, numbers is working on item 2 while doubler processes item 1 and printer prints item 0. Three items in flight, one per step — that's lockstep streaming.

Fan-Out: Multiple Consumers

When multiple consumers depend on the same producer, SynaFlow automatically forks the stream with itertools.tee and advances them together:

flowchart TD
    gen["gen<br/><i>Stream[int]</i>"]
    lazy["lazy<br/><i>Stream[int]</i>"]
    eager["eager<br/><i>list[int]</i>"]
    count --> gen
    gen --> lazy
    gen --> eager
def lazy_consumer(gen: Iterator[int]) -> Iterator[int]:
    for x in gen:
        yield x * 10

def eager_consumer(gen: list[int]) -> int:
    return sum(gen)
async def lazy_consumer(gen: AsyncIterator[int]) -> AsyncIterator[int]:
    async for x in gen:
        yield x * 10

async def eager_consumer(gen: list[int]) -> int:
    return sum(gen)
  • lazy_consumer receives a lazy fork — streams without holding data.
  • eager_consumer asks for list[int] — SynaFlow materializes only that fork.

Both consumers receive every item. The lazy fork never holds the full dataset; only the eager fork does.

Execution Levels

SynaFlow topologically sorts the DAG into levels. Steps on the same level can run in parallel (in an async runner):

dag = pipeline_def.dag
print(dag.get_execution_levels())
# [['numbers'], ['doubler'], ['printer']]

For a diamond topology, independent branches share a level:

       start
      /     \
 branch_a  branch_b
      \     /
       merge

Levels:  ['start']  →  ['branch_a', 'branch_b']  →  ['merge']

Diamond DAG in Action

Here's a more complex example: one producer feeds two branches, and a final step joins both streams. Watch how SynaFlow advances doubler and tripler in lockstep, pads the shorter stream with None, and feeds the pairs to join.

Step: 0/10
numbers doubler tripler join
numbers doubler tripler join (pairs)

The diamond animation shows a key behavior: when doubler and tripler produce items at the same time, SynaFlow unrolls both streams together — yielding pairs like (2, 3), (4, 6) — before moving to the next item. If one stream were shorter, SynaFlow would pad with None. This is the same lockstep mechanism, applied to fan-in.

When Materialization Happens

Consumer expects Behavior
Iterator[T] Lazy stream — one item in memory
list[T] Full materialization in memory
dict[K,V] Materialized from Iterator[tuple[K,V]]
set[T] Full materialization in memory

Materialization is per-branch — a lazy consumer and an eager consumer coexist without forcing each other.

Next

Start building in the Hello World tutorial.