Sinks¶
Sinks receive processed records and persist them. The pipeline fans out to all registered sinks — every sink sees every record.
Built-in sinks¶
JsonLinesSink¶
Write records as newline-delimited JSON (JSONL). Uses stdlib only.
from agora.sinks.file.jsonlines import JsonLinesSink
sink = JsonLinesSink(
path="output/records.jsonl",
serializer=lambda r: r.model_dump(), # optional; defaults to model_dump / __dict__
append=False,
flush_every=100,
encoding="utf-8",
)
CsvSink¶
Write records as CSV. Uses stdlib only.
from agora.sinks.file.csv import CsvSink
sink = CsvSink(
path="output/records.csv",
row_mapper=lambda r: {"id": r.id, "name": r.name, "score": r.score},
fieldnames=["id", "name", "score"], # explicit column order
append=False,
flush_every=100,
delimiter=",",
)
ParquetSink¶
Write records incrementally to a Parquet file via PyArrow.
Requires: pip install "agora-etl[file]"
from agora.sinks.file.parquet import ParquetSink
sink = ParquetSink(
path="output/records.parquet",
row_mapper=lambda r: {"id": r.id, "name": r.name, "score": float(r.score)},
batch_size=1000,
compression="snappy",
)
WebhookSink¶
POST records to an HTTP endpoint. Supports batch mode and retry on 429/5xx.
from agora.sinks.http.webhook import WebhookSink
sink = WebhookSink(
url="https://api.example.com/ingest",
headers={"Authorization": "Bearer my-token"},
batch_mode=True,
flush_every=50,
max_retries=3,
)
StdoutSink¶
Print records to stdout. Useful for development and debugging.
from agora.sinks.io.stdout import StdoutSink
sink = StdoutSink(prefix="[record] ")
LogSink¶
Emit records via the structured logger.
from agora.sinks.io.log import LogSink
sink = LogSink(level="info")
Custom sink¶
Subclass BaseSink[T] and implement write():
from agora.core.sink import BaseSink
class MyDatabaseSink(BaseSink[MyRecord]):
sink_name = "my_database"
async def open(self) -> None:
self._conn = await connect(self._dsn)
async def write(self, record: MyRecord) -> None:
await self._conn.execute(
"INSERT INTO records (id, name) VALUES ($1, $2)",
record.id, record.name,
)
async def close(self) -> None:
await self._conn.close()
Override write_batch() for bulk inserts:
async def write_batch(self, records: list[MyRecord]) -> None:
await self._conn.executemany(
"INSERT INTO records (id, name) VALUES ($1, $2)",
[(r.id, r.name) for r in records],
)
Set batch_writable_native = True on the class to tell the runtime to prefer write_batch() over individual write() calls.
Fan-out¶
Write each record to multiple sinks:
summary = await (
Pipeline(src)
.fan_out([file_sink, webhook_sink], batch_size=50)
.run()
)
Routing¶
Route records to different sinks based on a predicate:
from agora.core.sink import SinkRouter
router = (
SinkRouter()
.route(lambda r: r.region == "APAC", apac_sink)
.route(lambda r: r.region == "EMEA", emea_sink)
.default(fallback_sink)
)
summary = await Pipeline(src).route(router).run()