SynaFlow & the Java Streams API¶
SynaFlow draws direct inspiration from the Java Streams API. If you've written
stream.map().filter().collect() in Java, SynaFlow will feel familiar — with one
critical difference: persistence backends.
Conceptual mapping¶
| Java Streams | SynaFlow |
|---|---|
stream.map(f) |
EACH mode: consumer (T) → U over producer Iterator[T] |
stream.filter(p) |
EACH mode with conditional yield |
stream.collect(toList()) |
Consumer list[T] triggers Iterator[T] → list[T] |
stream.collect(toSet()) |
Consumer set[T] triggers Iterator[T] → set[T] |
stream.collect(toMap(k,v)) |
Consumer dict[K,V] triggers Iterator[tuple[K,V]] → dict[K,V] |
stream.forEach(f) |
EACH mode terminal step (no downstream consumers) |
stream.iterator() |
Lazy consumer Iterator[T] |
Collector interface |
MaterializerFactory + returned Materializer callable |
stream.flatMap(...) |
EACH mode producing an iterable, implicitly flattened |
Side-by-side example¶
from collections.abc import Generator, Iterator
from synaflow import pipeline, step, run
def producer(count: int) -> Generator[int, None, None]:
yield from range(count)
def doubler(producer: int) -> int: # EACH: map
return producer * 2
def big_enough(doubler: int) -> int: # EACH: filter
if doubler > 5:
yield doubler
def collector(big_enough: list[int]) -> None: # collect(toList)
print(big_enough)
p = pipeline(
name="streams",
params=type("P", (NamedTuple,), {"count": 10}),
steps=[
step("producer", fn=producer),
step("doubler", fn=doubler),
step("big_enough", fn=big_enough),
step("collector", fn=collector),
],
)
run(p, p.params_type()(count=10))
# Output: [6, 8, 10, 12, 14, 16, 18]
from collections.abc import AsyncGenerator, AsyncIterator
from synaflow import pipeline, step, async_run
async def producer(count: int) -> AsyncGenerator[int, None]:
for i in range(count):
yield i
async def doubler(producer: int) -> int:
return producer * 2
async def big_enough(doubler: int) -> int:
if doubler > 5:
yield doubler
async def collector(big_enough: list[int]) -> None:
print(big_enough)
p = pipeline(
name="streams",
params=type("P", (NamedTuple,), {"count": 10}),
steps=[
step("producer", fn=producer),
step("doubler", fn=doubler),
step("big_enough", fn=big_enough),
step("collector", fn=collector),
],
)
async_run(p, p.params_type()(count=10))
No nested streams — by design¶
In Java, stream.flatMap(...) returns a stream, but you can't accidentally nest
streams. SynaFlow takes the same position: Iterator[Iterator[T]] does not
exist in user-facing code.
When an EACH-mode step produces an iterable, SynaFlow implicitly flattens it.
When it produces a scalar, outputs are auto-collected into ListType. Two
consecutive T → Iterator[T] steps still produce a flat Iterator[T]:
from collections.abc import Generator, Iterator
def step1() -> Generator[int, None, None]:
yield from range(3) # produces Iterator[int]
def step2(step1: int) -> Generator[str, None, None]:
yield str(step1) # EACH mode, produces Generator[str]
yield str(step1 * 10) #
# step2's output is ListType(str) → flat list of strings
# NOT Iterator[Iterator[str]]
This is deliberate. Every SynaFlow pipeline operates on flat, typed streams.
There is no use case that requires nested iterators — if you need grouping, use
a dict materializer. If you need lateral expansion, yield multiple items from
an EACH step (as shown above). The framework handles flattening transparently.
The key difference: where data lives¶
Java Streams are purely in-memory. A Collector always produces an
in-memory collection (ArrayList, HashSet, HashMap).
SynaFlow's materializer can target any storage backend. The consumer only
declares the protocol (list, set, dict); the materializer factory
decides the concrete storage:
| Java | SynaFlow |
|---|---|
Collectors.toList() → ArrayList in RAM |
list[T] → DiskBackedList on SSD, or S3-backed, or in-memory |
Collectors.toSet() → HashSet in RAM |
set[T] → Redis-backed set, or in-memory |
Collectors.toMap(...) → HashMap in RAM |
dict[K,V] → SQLite with LRU cache, or in-memory |
| Always in-memory | Disk, network, S3, database — any backend with the right protocol |
from synaflow import pipeline, step, run, disk_materializer
# Consumer asks for list[T] — transparently stored on disk
p = pipeline(
name="big_data",
params=Params,
steps=[...],
memory_materializer_factory=disk_materializer("/mnt/ssd"),
)
The consumer code doesn't change — it still says def fn(data: list[int]).
Whether that list lives in RAM, on SSD, in S3, or in a database is entirely
up to the materializer factory configured at the pipeline root. This is the
core of the protocol-over-concrete-type design. The consumer only knows
the interface (MutableSequence for list, MutableMapping for dict);
the factory decides the backend.
Materializer = Collector (but not only at the end)¶
| Java Streams Collector | SynaFlow equivalent |
|---|---|
Collectors.toList() |
memory_materializer (default: returns list) |
Collectors.toSet() |
Materializer that returns set |
Collectors.toMap(...) |
Materializer that takes Iterator[tuple[K,V]] → dict[K,V] |
Collectors.groupingBy(...) |
EACH mode → dict materializer (MapReduce shuffle) |
Custom Collector<T,A,R> |
Custom MaterializerFactory returning Callable[[Iterator], Iterable] |
In Java Streams, collectors only apply to terminal operations — you collect at the end. SynaFlow materializers can appear anywhere in the pipeline, not just at the end. A step in the middle can materialize to disk, and downstream steps read from that persisted data:
step("cache", fn=expensive_computation,
materializer=disk_materializer("/mnt/ssd"),
force_materialize=True)
step("downstream", fn=downstream) # reads from cached data
This enables patterns like checkpointing, intermediate persistence, and incremental processing that have no equivalent in the Java Streams API.
When to prefer SynaFlow over Java Streams¶
- You need persistence (disk, database, cloud storage).
- You need lazy lockstep consumption across multiple consumers without
manual
teemanagement. - You need a bounded ahead window (like
max_in_flight) for I/O-bound patterns. In Java, this pattern requires moving to Project Reactor, RxJava, or JDK 9+java.util.concurrent.Flowto use reactive backpressure /prefetchbuffers. Standard Java Streams are synchronous and lockstep-only. - You need sync/async parity — the same pipeline definition runs in both.
- Your data doesn't fit in memory but you still want the Streams-like API.