Plugins¶
Agora is designed to be extended. Third-party packages register themselves via Python entry-points and are discovered when the relevant registries are loaded at runtime.
Entry-point groups¶
| Group | Purpose |
|---|---|
agora.sources |
Custom source types |
agora.sinks |
Custom sink types |
agora.middlewares |
Custom middleware types |
agora.ai.providers |
AI provider implementations |
agora.ai.caches |
LLM response cache backends |
agora.runner |
Custom runner types |
agora.middlewares.dedup.stores |
Dedup store backends |
agora.middlewares.dedup.strategies |
Dedup comparison strategies |
agora.metrics.exporters |
Metrics exporters |
agora.state.backends |
State backend implementations |
Official plugins¶
The main first-party extension package is agora-etl-plugins. It currently groups official integrations such as:
- Redis
- cron scheduling helpers
- distributed worker coordination
- Kafka
- PostgreSQL
Registering a plugin¶
In your package's pyproject.toml:
[project.entry-points."agora.sources"]
my_source = "my_package.sources:MySource"
[project.entry-points."agora.sinks"]
my_sink = "my_package.sinks:MySink"
[project.entry-points."agora.middlewares"]
my_middleware = "my_package.middlewares:MyMiddleware"
[project.entry-points."agora.ai.providers"]
my_provider = "my_package.providers:MyProvider"
After installing the package, agora plugins list will show the registered components.
If a plugin package also exposes a MANIFEST, Agora uses that metadata for CLI diagnostics and compatibility hints. The manifest version is a plugin-contract marker, not the same thing as the agora-etl package version.
For older plugins, agora.core.registry.AGORA_API_VERSION still aliases the
same manifest-contract version constant. New plugins should prefer
AGORA_PLUGIN_MANIFEST_VERSION. The alias is deprecated in 0.2.0.
If a plugin advertises an incompatible manifest version, Agora leaves it out of the active registry but still surfaces it in CLI diagnostics as incompatible so operators can see why it was rejected.
Using plugins in config-driven pipelines¶
Registered plugins can be referenced by name in declarative pipeline configs:
[pipelines.example.source]
type = "my_source"
url = "https://api.example.com"
[[pipelines.example.middlewares]]
type = "my_middleware"
threshold = 0.9
[[pipelines.example.sinks]]
type = "my_sink"
dsn = "postgresql://localhost/mydb"
Or equivalently, in Python-driven assembly:
from agora import Pipeline
from agora.sources import source_registry
from agora.sinks import sink_registry
source = source_registry.create("my_source", url="https://api.example.com")
sink = sink_registry.create("my_sink", dsn="postgresql://localhost/mydb")
pipeline = Pipeline(source).build(sink)
AI providers¶
AI providers implement the AIProvider protocol:
from agora.ai.providers.base import AIProvider, CompletionResponse
class MyProvider:
model = "my-model-v1"
async def complete(
self,
prompt: str,
*,
system: str | None = None,
temperature: float = 0.0,
max_tokens: int = 4096,
) -> CompletionResponse:
response = await self._client.generate(prompt)
return CompletionResponse(
content=response.text,
model=self.model,
input_tokens=response.usage.input,
output_tokens=response.usage.output,
)
Register it:
[project.entry-points."agora.ai.providers"]
my_provider = "my_package.providers:MyProvider"
Use it in any AI middleware:
from agora.middlewares.ai.enrich import AIEnrichMiddleware
pipeline.pipe(AIEnrichMiddleware(
provider=MyProvider(api_key="..."),
prompt_template="Summarize: {name}",
output_fields=["summary"],
))
Dedup store plugins¶
Implement the DedupStore protocol to add a distributed dedup backend:
from agora.middlewares.dedup.stores.base import DedupStore
class RedisStore(DedupStore[str]):
store_name = "redis"
async def exists(self, key: str) -> bool:
return await self._redis.sismember(self._set_key, key)
async def add(self, key: str) -> None:
await self._redis.sadd(self._set_key, key)
async def mark_if_new(self, key: str) -> bool:
return bool(await self._redis.sadd(self._set_key, key))
async def close(self) -> None:
await self._redis.aclose()
Discovering plugins manually¶
from agora.core.discovery import discover_plugins
discover_plugins() # loads all entry-points into the registries
This is called automatically when using AgoraContainer.from_config() or the CLI. In most projects you do not need to call it manually.