Sources¶
Sources emit records via an async generator. The pipeline consumes them one at a time.
Built-in sources¶
Checkpoint support at a glance¶
Checkpointing is opt-in per source. A source must explicitly expose
supports_checkpoint = True, implement current_checkpoint(), and restore
state in prepare_resume().
| Source | Checkpoint support | Resume position |
|---|---|---|
JsonLinesSource |
Yes | line number |
CsvSource |
Yes | row number |
ParquetSource |
Yes | row number |
HTTPSource |
Not by default | custom, source-specific |
Do not assume every source or plugin source is resumable unless its docs say so.
JsonLinesSource¶
Stream records from a JSONL (newline-delimited JSON) file.
from agora.sources.file.jsonlines import JsonLinesSource
from agora.core.types import SourceRecordFailurePolicy
source = JsonLinesSource(
path="data/events.jsonl",
row_mapper=lambda row: Event(**row),
encoding="utf-8",
batch_size=1000,
on_record_error=SourceRecordFailurePolicy.LOG_AND_CONTINUE,
)
Supports checkpointing — resumes from the last processed line number after a restart.
CsvSource¶
Stream records from a CSV or TSV file using the stdlib csv module.
from agora.sources.file.csv import CsvSource
source = CsvSource(
path="data/products.csv",
row_mapper=lambda row: Product(
id=row["id"],
name=row["name"],
price=float(row["price"]),
),
delimiter=",",
has_header=True,
encoding="utf-8-sig", # strips Excel BOM
)
Supports checkpointing by row number.
ParquetSource¶
Stream records from a Parquet file using PyArrow. Reads in batches to avoid loading the entire file into memory.
Requires: pip install "agora-etl[file]"
from agora.sources.file.parquet import ParquetSource
source = ParquetSource(
path="data/sales.parquet",
row_mapper=lambda row: SalesRecord(**row),
batch_size=1000,
)
Supports checkpointing by row number.
HTTPSource¶
Abstract base for HTTP polling sources. Handles rate limiting, retries, circuit breaking, and response caching. Override fetch_batch() only.
from agora.sources.http.http import HTTPSource, StopFetching
from agora.sources._internal.circuit_breaker import CircuitBreakerConfig
class PostsSource(HTTPSource[Post]):
source_name = "posts_api"
def __init__(self) -> None:
super().__init__(
base_url="https://api.example.com",
requests_per_second=5.0,
max_retries=3,
cache_ttl_seconds=3600,
circuit_breaker=CircuitBreakerConfig(failure_threshold=5),
)
self._page = 1
async def fetch_batch(self):
resp = await self.get("/posts", params={"page": self._page})
items = resp.json()["items"]
if not items:
raise StopFetching
for item in items:
yield Post(**item)
self._page += 1
Available request methods: self.get(), self.post(). Both are rate-limited, retried, and optionally cached.
HTTPSource does not define a generic checkpoint contract by itself. If an
HTTP-based source needs resumability, implement it explicitly in the subclass
using a cursor, page token, watermark, or other source-specific position.
Custom source¶
Subclass BaseSource[T] and implement stream():
from agora.core.source import BaseSource, SourceRecordError
from collections.abc import AsyncGenerator
class MySource(BaseSource[MyRecord]):
source_name = "my_source"
async def open(self) -> None:
self._client = await create_client()
async def close(self) -> None:
await self._client.close()
async def stream(self) -> AsyncGenerator[MyRecord, None]:
async for raw in self._client.fetch():
try:
yield MyRecord.from_dict(raw)
except Exception as exc:
raise SourceRecordError(exc, record=raw)
Raise SourceRecordError to route a single bad record to the DLQ without stopping the pipeline.
Checkpointable source¶
Implement current_checkpoint() and prepare_resume() to support resumable pipelines:
class MyCheckpointableSource(BaseSource[MyRecord]):
source_name = "my_source"
def current_checkpoint(self) -> dict | None:
return {"cursor": self._last_cursor}
async def prepare_resume(self, checkpoint) -> None:
if checkpoint:
self._last_cursor = checkpoint.value["cursor"]
Also set supports_checkpoint = True so the runtime knows this source opts into
resume behavior.