Skip to content

Examples

Every SynaFlow pipeline can be visualized with scripts/visualize_dag.py.

complex_parallel

from collections.abc import Generator, Iterator
from typing import NamedTuple

from synaflow import pipeline, step


class ComplexParallelParams(NamedTuple):
    base: int = 1


def step1(base: int) -> Generator[int, None, None]:
    for i in range(5):
        yield base + i


def step2(step1: Iterator[int]) -> Generator[int, None, None]:
    for x in step1:
        yield x * 10


def step3(step2: Iterator[int]) -> Generator[int, None, None]:
    for x in step2:
        yield x + 1


def step4(step1: Iterator[int]) -> Generator[int, None, None]:
    for x in step1:
        yield x * 100


def step5(step3: Iterator[int], step4: Iterator[int]) -> None:
    pass


# Topology:
# step1 -> step2 -> step3 \
#       -> step4 --------> step5
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple

from synaflow import pipeline, step


class ComplexParallelParams(NamedTuple):
    base: int = 1


async def step1(base: int) -> AsyncGenerator[int, None, None]:
    for i in range(5):
        yield base + i


async def step2(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
    async for x in step1:
        yield x * 10


async def step3(step2: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
    async for x in step2:
        yield x + 1


async def step4(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
    async for x in step1:
        yield x * 100


async def step5(step3: AsyncIterator[int], step4: AsyncIterator[int]) -> None:
    pass


# Topology:
# step1 -> step2 -> step3 \
#       -> step4 --------> step5

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    step1["step1<br/><i>Stream[int, None, None]</i>"]
    step2["step2<br/><i>Stream[int, None, None]</i>"]
    step3["step3<br/><i>Stream[int, None, None]</i>"]
    step4["step4<br/><i>Stream[int, None, None]</i>"]
    step5["step5<br/><i>None</i>"]
    base --> step1
    step1 --> step2
    step2 --> step3
    step1 --> step4
    step3 --> step5
    step4 --> step5

complex_parallel_mixed

from collections.abc import Generator, Iterator
from typing import NamedTuple

from synaflow import pipeline, step


class ComplexParallelMixedParams(NamedTuple):
    base: int = 1


def step1(base: int) -> Generator[int, None, None]:
    for i in range(5):
        yield base + i


def step2(step1: Iterator[int]) -> Generator[int, None, None]:
    for x in step1:
        yield x * 10


def step3(step2: Iterator[int]) -> Generator[int, None, None]:
    for x in step2:
        yield x + 1


def step4(step1: Iterator[int]) -> Generator[int, None, None]:
    for x in step1:
        yield x * 100


def step5(step2: Iterator[int], step4: Iterator[int]) -> None:
    pass


# Topology:
# step1 -> step2 -> step3
#       \       \
#        \       -> step5
#         -> step4 /
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple

from synaflow import pipeline, step


class ComplexParallelMixedParams(NamedTuple):
    base: int = 1


async def step1(base: int) -> AsyncGenerator[int, None, None]:
    for i in range(5):
        yield base + i


async def step2(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
    async for x in step1:
        yield x * 10


async def step3(step2: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
    async for x in step2:
        yield x + 1


async def step4(step1: AsyncIterator[int]) -> AsyncGenerator[int, None, None]:
    async for x in step1:
        yield x * 100


async def step5(step2: AsyncIterator[int], step4: AsyncIterator[int]) -> None:
    pass


# Topology:
# step1 -> step2 -> step3
#       \       \
#        \       -> step5
#         -> step4 /

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    step1["step1<br/><i>Stream[int, None, None]</i>"]
    step2["step2<br/><i>Stream[int, None, None]</i>"]
    step3["step3<br/><i>Stream[int, None, None]</i>"]
    step4["step4<br/><i>Stream[int, None, None]</i>"]
    step5["step5<br/><i>None</i>"]
    base --> step1
    step1 --> step2
    step2 --> step3
    step1 --> step4
    step2 --> step5
    step4 --> step5

deep_sub_pipelines

from typing import Iterator, NamedTuple

from synaflow import include, pipeline, step
from typing import AsyncIterator, NamedTuple

from synaflow import include, pipeline, step

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    l2_each__adapter["l2_each__adapter<br/><i>Stream[Level2Params]</i>"]
    l2_each__l3_res__adapter["l2_each__l3_res__adapter<br/><i>ListType(<class 'tests.execution.sync_engine.corpus.deep_sub_pipelines.Level3Params'>)</i>"]
    l2_each__l3_res["l2_each__l3_res<br/><i>ListType(<class 'int'>)</i>"]
    l2_each["l2_each<br/><i>ListType(<class 'int'>)</i>"]
    l2_single__adapter["l2_single__adapter<br/><i>Level2Params</i>"]
    l2_single__l3_res__adapter["l2_single__l3_res__adapter<br/><i>Level3Params</i>"]
    l2_single__l3_res["l2_single__l3_res<br/><i>int</i>"]
    l2_single["l2_single<br/><i>int</i>"]
    consolidate["consolidate<br/><i>dict</i>"]
    values --> l2_each__adapter
    l2_each__adapter --> l2_each__l3_res__adapter
    l2_each__l3_res__adapter --> l2_each__l3_res
    l2_each__l3_res --> l2_each
    values --> l2_single__adapter
    l2_single__adapter --> l2_single__l3_res__adapter
    l2_single__l3_res__adapter --> l2_single__l3_res
    l2_single__l3_res --> l2_single
    l2_each --> consolidate
    l2_single --> consolidate

diamond

from typing import NamedTuple

from synaflow import pipeline, step


class DiamondParams(NamedTuple):
    base_val: int = 10


def start(base_val: int) -> int:
    return base_val


def branch_a(start: int) -> int:
    return start + 1


def branch_b(start: int) -> int:
    return start + 2


def merge(branch_a: int, branch_b: int) -> int:
    return branch_a + branch_b
from typing import NamedTuple

from synaflow import pipeline, step


class DiamondParams(NamedTuple):
    base_val: int = 10


async def start(base_val: int) -> int:
    return base_val


async def branch_a(start: int) -> int:
    return start + 1


async def branch_b(start: int) -> int:
    return start + 2


async def merge(branch_a: int, branch_b: int) -> int:
    return branch_a + branch_b

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    start["start<br/><i>int</i>"]
    branch_a["branch_a<br/><i>int</i>"]
    branch_b["branch_b<br/><i>int</i>"]
    merge["merge<br/><i>int</i>"]
    base_val --> start
    start --> branch_a
    start --> branch_b
    branch_a --> merge
    branch_b --> merge

error_handling

from collections.abc import Generator, Iterator
from typing import NamedTuple
from synaflow import pipeline, step


class ErrorHandlingParams(NamedTuple):
    pass


errors_list = []


def custom_error_handler(exc: BaseException) -> None:
    errors_list.append(str(exc))


def custom_err_mat(ctx):
    return custom_error_handler


def gen() -> Generator[int, None, None]:
    yield 1
    raise ValueError("gen failed")


def consumer(gen: Iterator[int]) -> None:
    for x in gen:
        pass
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple
from synaflow import pipeline, step


class ErrorHandlingParams(NamedTuple):
    pass


errors_list = []


def custom_error_handler(exc: BaseException) -> None:
    errors_list.append(str(exc))


def custom_err_mat(ctx):
    return custom_error_handler


async def gen() -> AsyncGenerator[int, None]:
    yield 1
    raise ValueError("gen failed")


async def consumer(gen: AsyncIterator[int]) -> None:
    async for x in gen:
        pass

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    gen["gen<br/><i>Stream[int, None, None]</i>"]
    consumer["consumer<br/><i>None</i>"]
    gen --> consumer

explicit_modes

from collections.abc import Generator
from typing import NamedTuple

from synaflow import StepMode, pipeline, step
from collections.abc import AsyncGenerator
from typing import NamedTuple

from synaflow import StepMode, pipeline, step

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    emit["emit<br/><i>Stream[int, None, None]</i>"]
    double["double<br/><i>ListType(<class 'int'>)</i>"]
    summarize["summarize<br/><i>int</i>"]
    items --> emit
    emit --> double
    double --> summarize

fibonacci

from collections.abc import Generator, Iterator
from typing import NamedTuple

from synaflow import pipeline, step


class FibonacciParams(NamedTuple):
    count: int = 10


def fibonacci_generator(count: int) -> Generator[int, None, None]:
    a, b = 0, 1
    for _ in range(count):
        yield a
        a, b = b, a + b


def square_numbers(fibonacci_generator: Iterator[int]) -> Generator[int, None, None]:
    for x in fibonacci_generator:
        yield x * x


def consumer(square_numbers: Iterator[int]) -> None:
    pass
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple

from synaflow import pipeline, step


class FibonacciParams(NamedTuple):
    count: int = 10


async def fibonacci_generator(count: int) -> AsyncGenerator[int, None, None]:
    a, b = 0, 1
    for _ in range(count):
        yield a
        a, b = b, a + b


async def square_numbers(
    fibonacci_generator: AsyncIterator[int],
) -> AsyncGenerator[int, None, None]:
    async for x in fibonacci_generator:
        yield x * x


async def consumer(square_numbers: AsyncIterator[int]) -> None:
    pass

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    fibonacci_generator["fibonacci_generator<br/><i>Stream[int, None, None]</i>"]
    square_numbers["square_numbers<br/><i>Stream[int, None, None]</i>"]
    consumer["consumer<br/><i>None</i>"]
    count --> fibonacci_generator
    fibonacci_generator --> square_numbers
    square_numbers --> consumer

linear

from collections.abc import Generator, Iterator
from typing import NamedTuple

from synaflow import Observer, pipeline, step


class LinearParams(NamedTuple):
    count: int = 3


def numbers(count: int) -> Generator[int, None, None]:
    yield from range(count)


def transformer(number: int) -> int:
    return number * 2


def consumer(transformer: Iterator[int]) -> None:
    for x in transformer:
        pass
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple

from synaflow import pipeline, step


class LinearParams(NamedTuple):
    count: int = 3


async def numbers(count: int) -> AsyncGenerator[int, None, None]:
    for _i in range(count):
        yield _i


async def transformer(number: int) -> int:
    return number * 2


async def consumer(transformer: AsyncIterator[int]) -> None:
    async for x in transformer:
        pass

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    numbers["numbers<br/><i>Stream[int, None, None]</i>"]
    transformer["transformer<br/><i>ListType(<class 'int'>)</i>"]
    consumer["consumer<br/><i>None</i>"]
    count --> numbers
    numbers --> transformer
    transformer --> consumer

mixed_fanout

from collections.abc import Generator, Iterator
from typing import NamedTuple

from synaflow import pipeline, step
from collections.abc import AsyncGenerator, AsyncIterator
from typing import NamedTuple

from synaflow import pipeline, step

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    gen["gen<br/><i>Stream[int, None, None]</i>"]
    lazy["lazy<br/><i>tuple[bool, list[int]]</i>"]
    eager["eager<br/><i>tuple[bool, list[int]]</i>"]
    count --> gen
    gen --> lazy
    gen --> eager

sub_pipelines

from typing import Iterator, NamedTuple

from synaflow import include, pipeline, step


class BParams(NamedTuple):
    text: str


def func_b1(text: str) -> str:
    return text.upper()


def func_b2(func_b1: str) -> int:
    return len(func_b1)


pipe_b = pipeline(
    name="TextProcessor",
    params=BParams,
    exports="func_b2",
    steps=[step("func_b1", fn=func_b1), step("func_b2", fn=func_b2)],
)


class AParams(NamedTuple):
    raw_texts: list[str]


def prepare_b_each(raw_texts: list[str]) -> Iterator[BParams]:
    for t in raw_texts:
        yield BParams(text=t)


def consolidate(my_text_processor: list[int]) -> int:
    return sum(my_text_processor)


pipe = pipeline(
    name="MainPipeline",
    params=AParams,
    steps=[
        include("my_text_processor", pipeline=pipe_b, fn=prepare_b_each),
        step("consolidate", fn=consolidate),
    ],
)
from typing import AsyncIterator, NamedTuple

from synaflow import include, pipeline, step


class BParams(NamedTuple):
    text: str


async def func_b1(text: str) -> str:
    return text.upper()


async def func_b2(func_b1: str) -> int:
    return len(func_b1)


pipe_b = pipeline(
    name="TextProcessor",
    params=BParams,
    exports="func_b2",
    steps=[step("func_b1", fn=func_b1), step("func_b2", fn=func_b2)],
)


class AParams(NamedTuple):
    raw_texts: list[str]


async def prepare_b_each(raw_texts: list[str]) -> AsyncIterator[BParams]:
    for t in raw_texts:
        yield BParams(text=t)


async def consolidate(my_text_processor: list[int]) -> int:
    return sum(my_text_processor)


pipe = pipeline(
    name="MainPipeline",
    params=AParams,
    steps=[
        include("my_text_processor", pipeline=pipe_b, fn=prepare_b_each),
        step("consolidate", fn=consolidate),
    ],
)

:fontawesome-brands-github: Sync source | :fontawesome-brands-github: Async source

flowchart TD
    my_text_processor__adapter["my_text_processor__adapter<br/><i>Stream[BParams]</i>"]
    my_text_processor__func_b1["my_text_processor__func_b1<br/><i>ListType(<class 'str'>)</i>"]
    my_text_processor["my_text_processor<br/><i>ListType(<class 'int'>)</i>"]
    consolidate["consolidate<br/><i>int</i>"]
    raw_texts --> my_text_processor__adapter
    my_text_processor__adapter --> my_text_processor__func_b1
    my_text_processor__func_b1 --> my_text_processor
    my_text_processor --> consolidate

Diagrams auto-generated from the test corpus.