Custom Materializers¶
Materializers control how stream outputs are collected. You can write custom ones for any storage backend.
Default Materializers¶
| Consumer type | Default behavior |
|---|---|
Iterator[T] |
No materialization (lazy stream) |
list[T] |
list() — all items in memory |
set[T] |
set() — all items in memory |
dict[K,V] |
dict() — key-value pairs from Iterator[tuple[K,V]] |
Writing a Custom Materializer¶
A materializer is a callable (Iterator[T]) -> Iterable[T]:
from collections.abc import Iterator
def count_materializer(stream: Iterator[int]) -> list[int]:
"""Materialize and log the count."""
result = list(stream)
print(f"Materialized {len(result)} items")
return result
step("consumer", fn=consumer, materializer=count_materializer)
Materializer Factories¶
For context-aware materialization (e.g., file paths based on pipeline/step names), use a factory:
from synaflow.types import MaterializeContext
def disk_factory(ctx: MaterializeContext) -> callable:
path = f"/data/{ctx.pipeline_name}/{ctx.dataset_name}.json"
def disk_materializer(stream):
data = list(stream)
with open(path, "w") as f:
json.dump(data, f)
return data
return disk_materializer
p = pipeline(
name="my_pipeline",
params=Params,
steps=[...],
memory_materializer_factory=disk_factory,
)
The MaterializeContext provides:
| Field | Description |
|---|---|
pipeline_name |
Name of the current pipeline |
dataset_name |
Name of the step producing the data |
item_type |
The type of items in the stream |
Error Materializers¶
When a step fails, error materializers capture the exception and partial output: