Sync & Async Parity¶
SynaFlow provides identical semantics for synchronous and asynchronous pipelines.
from collections.abc import Generator, Iterator
from synaflow import pipeline, step, run
def producer() -> Generator[int, None, None]:
yield from range(3)
def consumer(producer: Iterator[int]) -> None:
for x in producer:
print(x)
p = pipeline(
name="sync_example",
params=type("P", (NamedTuple,), {}),
steps=[
step("producer", fn=producer),
step("consumer", fn=consumer),
],
)
run(p, p.params_type())
from collections.abc import AsyncGenerator, AsyncIterator
from synaflow import pipeline, step, async_run
async def producer() -> AsyncGenerator[int, None]:
for i in range(3):
yield i
async def consumer(producer: AsyncIterator[int]) -> None:
async for x in producer:
print(x)
p = pipeline(
name="async_example",
params=type("P", (NamedTuple,), {}),
steps=[
step("producer", fn=producer),
step("consumer", fn=consumer),
],
)
async_run(p, p.params_type())
Key Rules¶
- A pipeline is either fully sync or fully async — mixing
Iteratorwithasync defraises an error. - Both engines share the same DAG representation and execution semantics.
- Observers, materializers, and error handling behave identically.
- Async observers are detected via
inspect.isawaitableand awaited automatically.
Converting a Sync Pipeline to Async¶
- Replace
Generator[T, None, None]withAsyncGenerator[T, None] - Replace
Iterator[T]withAsyncIterator[T] - Add
asyncto all step functions - Use
async_run()instead ofrun()
The pipeline definition and step configuration stay the same.