Getting Started¶
This guide takes you from installation to a runnable pipeline.
Requirements¶
- Python 3.11+
Install¶
Core package only:
pip install agora-etl
With file extras for Parquet support:
pip install "agora-etl[file]"
With official integrations:
pip install "agora-etl-plugins[redis]"
pip install "agora-etl-plugins[kafka]"
pip install "agora-etl-plugins[postgres]"
Scaffold a project¶
Create a new project with the built-in CLI:
agora new my-pipeline
cd my-pipeline
Generated layout:
my-pipeline/
├── agora.toml
├── agora.env.example
├── pyproject.toml
├── src/
│ ├── settings.py
│ ├── pipelines/
│ │ └── example.py
│ ├── models/
│ ├── normalizers/
│ └── sinks/
└── tests/
└── test_example.py
Run the generated example¶
The scaffold includes src/pipelines/example.py. Run it:
agora run pipelines.example
Or dry-run it to stdout:
agora run pipelines.example --dry-run
You can cap record count during local testing:
agora run pipelines.example --max-records 100
Your first pipeline¶
Here is a minimal pipeline built in pure Python:
from dataclasses import dataclass
from agora import Pipeline
from agora.core.source import IterableSource
from agora.sinks.io.stdout import StdoutSink
@dataclass
class Event:
id: int
status: str
async def build_pipeline():
source = IterableSource(
[
Event(id=1, status="new"),
Event(id=2, status="done"),
Event(id=3, status="new"),
]
)
return (
Pipeline(source, id="events")
.filter(lambda record: record.status == "new")
.build(StdoutSink(prefix="[event] "))
)
Run it by exposing build_pipeline() inside src/pipelines/example.py and then calling:
agora run pipelines.example
Add real sources and sinks¶
Once the basics work, swap the in-memory source for real components:
- file ingestion:
JsonLinesSource,CsvSource,ParquetSource - HTTP polling: subclass
HTTPSource - official plugins: Redis, Kafka, PostgreSQL
See:
Add validation or enrichment¶
Middlewares let you transform and protect records before they are written:
from agora.middlewares.validate import ValidateMiddleware
pipeline = Pipeline(source).pipe(ValidateMiddleware(schema=MyModel)).build(sink)
Useful next reads:
When to use declarative config¶
If you want operations-friendly pipeline definitions in TOML instead of Python wiring, Agora also supports config-driven pipelines:
agora run --config pipelines.toml --plan
agora run --config pipelines.toml
See Configuration for the full format.
Config-driven runs can import project callables via import = "module:name".
Keep those configs in trusted source control rather than accepting them from
untrusted users.
Next steps¶
- Learn config overlays with Configuration
- Explore worker processes in Runner
- Browse the full command surface in CLI Reference