How the DAG is Wired¶
SynaFlow reads your function signatures and builds a dependency graph automatically. There is no manual wiring. This page explains exactly how the mapping works.
A Minimal Pipeline¶
from collections.abc import Generator, Iterator
from typing import NamedTuple
from synaflow import pipeline, step, run
class Params(NamedTuple):
count: int = 3
def numbers(count: int) -> Generator[int, None, None]:
yield from range(count)
def doubler(numbers: Iterator[int]) -> Generator[int, None, None]:
for x in numbers:
yield x * 2
def printer(doubler: int) -> None:
print(doubler)
p = pipeline(
name="basic_example",
params=Params,
steps=[
step("numbers", fn=numbers),
step("doubler", fn=doubler),
step("printer", fn=printer),
],
)
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple
from synaflow import pipeline, step, async_run
class Params(NamedTuple):
count: int = 3
async def numbers(count: int) -> AsyncGenerator[int, None]:
for i in range(count):
yield i
async def doubler(numbers: AsyncIterator[int]) -> AsyncGenerator[int, None]:
async for x in numbers:
yield x * 2
async def printer(doubler: int) -> None:
print(doubler)
p = pipeline(
name="basic_example",
params=Params,
steps=[
step("numbers", fn=numbers),
step("doubler", fn=doubler),
step("printer", fn=printer),
],
)
This pipeline generates the following DAG:
flowchart TD
numbers["numbers<br/><i>Stream[int]</i>"]
doubler["doubler<br/><i>Stream[int]</i>"]
printer["printer<br/><i>None</i>"]
count --> numbers
numbers --> doubler
doubler --> printer
Let's break down how SynaFlow built this graph.
Rule 1: The Step Name = The Dataset¶
Every step() declaration creates a node in the DAG and a dataset that
holds the step's output. The dataset name is the step's name.
step("numbers", fn=numbers) # creates dataset "numbers"
step("doubler", fn=doubler) # creates dataset "doubler"
step("printer", fn=printer) # creates dataset "printer"
Any downstream step can reference these datasets.
Rule 2: The Parameter Name = The Dependency¶
When SynaFlow inspects a function's signature, each parameter name is matched against all existing datasets (prior steps and pipeline params). If a dataset with the same name exists, a dependency edge is created.
def doubler(numbers: Iterator[int]) -> ...:
# ^^^^^^^
# Matches dataset "numbers" → dependency created
If no dataset matches, the pipeline fails at build time:
def doubler(wrong_name: Iterator[int]) -> ...:
# ^^^^^^^^^^
# ❌ ValueError: no prior step or param produces 'wrong_name'
Smart Binding¶
Parameter names don't need to match exactly — they only need the same
Base Dataset Name (absolute plural form). The step named items can be
referenced as item (singular) or items_list (suffixed).
def transform(item: int) -> ...:
# ^^^^
# get_base_dataset_name("item") == "items" → matches step "items"
This is covered in detail in Semantic Naming.
Rule 3: The Type Hint = The Contract¶
The parameter's type annotation tells SynaFlow how the data should be delivered. This determines the execution mode:
| Parameter type | Producer type | Mode | Behavior |
|---|---|---|---|
T (scalar) |
Iterator[T] |
EACH | Called once per item |
Iterator[T] |
Iterator[T] |
ALL | Receives the lazy stream |
list[T] |
Iterator[T] |
ALL | Stream materialized to a list |
In our example:
def numbers(count: int) -> Generator[int, None, None]:
# ^^^ ← from Params (input)
# ^^^^^^^^^^^^^^^^^^^^^^^^^ ← produces Iterator[int]
def doubler(numbers: Iterator[int]) -> Generator[int, None, None]:
# ^^^^^^^^^^^^^^^^^^^^^^ ← parameter type = Iterator[int]
# ← producer type = Iterator[int]
# ← both are streams → ALL mode, lazy pass-through
def printer(doubler: int) -> None:
# ^^^^^^^ ← parameter type = int (scalar)
# ← producer type = Iterator[int] (from doubler)
# ← scalar consumer + stream producer → EACH mode
printer runs in EACH mode because it asks for a scalar int but receives
from a Generator[int]. SynaFlow unrolls the stream and calls printer once
for every item the doubler yields.
Rule 4: Pipeline Params = Global Inputs¶
Every field in the NamedTuple passed as params becomes a global input node
accessible to any step by name:
class Params(NamedTuple):
count: int = 3 # ← creates input node "count" with type int
def numbers(count: int) -> ...:
# ^^^^^ ← matches "count" from Params
Rule 5: The Execution Order = Topological Sort¶
SynaFlow computes the execution levels from the dependency graph. Steps at the same level have no dependencies on each other and could run in parallel.
Level 0: [numbers] ← depends only on params
Level 1: [doubler] ← depends on numbers
Level 2: [printer] ← depends on doubler
For a diamond topology:
flowchart TD
start["start<br/><i>int</i>"]
branch_a["branch_a<br/><i>int</i>"]
branch_b["branch_b<br/><i>int</i>"]
merge["merge<br/><i>int</i>"]
base_val --> start
start --> branch_a
start --> branch_b
branch_a --> merge
branch_b --> merge
Level 0: [start] ← depends on param
Level 1: [branch_a, branch_b] ← both depend only on start → can be parallel
Level 2: [merge] ← depends on branch_a AND branch_b
Summary¶
step("name", fn=func) def func(param: Type) -> OutputType
│ │ │
▼ ▼ ▼
Dataset "name" Dependency on Determines EACH/ALL
holds output dataset "param" and output type
With these three pieces — step name, parameter name, and type hint — SynaFlow builds the entire DAG without a single line of manual wiring.
Next¶
Now that you understand the wiring, see how Lockstep Data Flow uses this structure to achieve extreme memory efficiency, and how Max In Flight adds a bounded ahead window for I/O-bound streaming patterns.