Middlewares

Middlewares are the composable transformation layer between source and sink. Each middleware receives a record, optionally transforms it, and returns the result — or None to drop the record.

Built-in middlewares

MapMiddleware

Apply a synchronous function to each record:

from agora import MapMiddleware

pipeline.pipe(MapMiddleware(lambda r: r.model_copy(update={"score": r.score * 100})))

FilterMiddleware

Drop records that do not match a predicate. Shorthand: .filter() on the pipeline:

pipeline.filter(lambda r: r.score > 0.5)

RetryMiddleware

Retry a middleware on exception with exponential backoff:

from agora import RetryMiddleware

pipeline.pipe(RetryMiddleware(inner=my_middleware, max_retries=3))

ValidateMiddleware

Validate records against a Pydantic model. Invalid records are dropped or routed to the DLQ depending on configuration:

from agora.middlewares.validate import ValidateMiddleware

pipeline.pipe(ValidateMiddleware(schema=MyModel))

EnrichMiddleware

Enrich records with data from an async callable:

from agora.middlewares.enrich import EnrichMiddleware

async def fetch_metadata(record):
    meta = await metadata_api.get(record.id)
    return record.model_copy(update={"tags": meta.tags})

pipeline.pipe(EnrichMiddleware(enricher=fetch_metadata))

DedupMiddleware

Drop duplicate records by a computed key:

from agora.middlewares.dedup import DedupMiddleware
from agora.middlewares.dedup.stores.memory import InMemoryStore
from agora.middlewares.dedup.strategies.fuzzy import FuzzyMatchStrategy

# Exact dedup (default)
pipeline.pipe(DedupMiddleware(key=lambda r: r.id))

# Fuzzy dedup by name similarity
pipeline.pipe(DedupMiddleware(
    key=lambda r: r.name.lower(),
    store=InMemoryStore(),
    strategy=FuzzyMatchStrategy(threshold=0.85),
    max_fuzzy_keys=100_000,
))

Fuzzy dedup uses Jaro-Winkler similarity. Performance is O(n) per record up to max_fuzzy_keys. For large-scale fuzzy dedup, use a vector store plugin.

For distributed dedup across processes, use a Redis-backed store (available as a separate package).

AI middlewares

All AI middlewares require an AIProvider. Failures default to on_error="passthrough" — the original record passes through unchanged so the pipeline never stops due to LLM errors.

AIEnrichMiddleware

Add fields to each record using an LLM:

from agora.middlewares.ai.enrich import AIEnrichMiddleware

pipeline.pipe(AIEnrichMiddleware(
    provider=my_provider,
    prompt_template="Summarize this product: {name}. Return JSON: {\"summary\": \"...\"}",
    output_fields=["summary"],
    cache=LLMCache(".cache/llm.db"),
))

AIClassifyMiddleware

Classify each record into one of a fixed set of categories:

from agora.middlewares.ai.classify import AIClassifyMiddleware

pipeline.pipe(AIClassifyMiddleware(
    provider=my_provider,
    source_fields=["name", "description"],
    categories=["restaurant", "hotel", "attraction", "cafe"],
    output_field="category",
))

AIExtractMiddleware

Extract structured fields from unstructured text:

from agora.middlewares.ai.extract import AIExtractMiddleware

pipeline.pipe(AIExtractMiddleware(
    provider=my_provider,
    source_field="raw_text",
    output_fields=["price", "currency", "quantity"],
))

AIValidateMiddleware

Validate records using an LLM and drop or flag invalid ones:

from agora.middlewares.ai.validate import AIValidateMiddleware

pipeline.pipe(AIValidateMiddleware(
    provider=my_provider,
    prompt_template="Is this a valid address: {address}? Return JSON: {\"valid\": true/false}",
))

AITranslateMiddleware

Translate text fields to a target language:

from agora.middlewares.ai.translate import AITranslateMiddleware

pipeline.pipe(AITranslateMiddleware(
    provider=my_provider,
    source_field="description",
    target_language="English",
    output_field="description_en",
))

AIBatchMiddleware

Amortize LLM costs by batching multiple records into a single API call. The LLM response must be a JSON array of the same length as the input batch.

from agora.middlewares.ai.batch import AIBatchMiddleware

pipeline.pipe(AIBatchMiddleware(
    provider=my_provider,
    prompt_fn=lambda records: (
        f"Enrich {len(records)} records. "
        f"Return a JSON array of same length. Input: {json.dumps(records)}"
    ),
    output_fields=["summary", "tags"],
    batch_size=20,
    flush_timeout_ms=500,
))

Custom middleware

Subclass Middleware[T, U] and implement process():

from agora.core.middleware import Middleware
from agora.core.context import PipelineContext

class NormalizeMiddleware(Middleware[RawRecord, CleanRecord]):
    name = "normalize"

    async def process(self, record: RawRecord, ctx: PipelineContext) -> CleanRecord | None:
        if not record.name:
            return None   # drop the record
        return CleanRecord(
            id=record.id,
            name=record.name.strip().lower(),
        )

Return None to drop the record. Raise an exception to route it to the DLQ.

Override on_start() and on_stop() for setup and teardown:

    async def on_start(self, ctx: PipelineContext) -> None:
        self._client = await create_client()

    async def on_stop(self, ctx: PipelineContext) -> None:
        await self._client.close()

Custom AI middleware

Subclass AIMiddleware[T]:

from agora.middlewares.ai.base import AIMiddleware

class SentimentMiddleware(AIMiddleware[Review]):
    name = "sentiment"

    async def process(self, record: Review, ctx: PipelineContext) -> Review | None:
        try:
            prompt = self._render_prompt(
                "Analyze sentiment of: {text}. Return JSON: {\"sentiment\": \"positive|negative|neutral\"}",
                record,
            )
            resp = await self._cached_complete(prompt, ctx=ctx)
            data = self._parse_json(resp.content)
            return record.model_copy(update=data)
        except Exception as exc:
            return await self._handle_error(exc, record, ctx)

Always pass ctx=ctx to _cached_complete() — it is required for AI metrics tracking.