Event-driven pipeline with Motia that ingests raw data, validates against Pydantic contracts, auto-repairs with an AI agent (and rule cache), and loads to destination. Unrecoverable events go to a Dead Letter Queue for human inspection.
- Contracts:
contracts/v1/— versioned Pydantic schemas (BaseEvent, EventEnvelope, TransactionSchema, OrderEvent). - Envelope + Payload: orders use
EventEnvelope(envelope withversion,source) +OrderPayload; the Validator can inspect the envelope before the payload. - Steps (Motia):
src/*_step.py— Ingestor, Validator, Healing Agent, Loader, DLQ, process_order_payment. - Cache-Aside (Memoization): The Validator first queries State (repair rules). If a known rule exists, it applies and re-validates; otherwise it emits
validation_errorand the Healing Agent calls the LLM. Solutions are persisted asRepairRulewith TTL. - State: Rules in Motia Stream (
repair_rules); seesrc/repair_state.py(CacheProvider) andcontracts/v1/repair_rule.py(RepairRule, StoredRepairRule).
- Python 3.10+
- III Engine (Motia runtime)
- Optional: uv to run the steps (recommended; the project uses
uv runiniii-config.yaml) - Optional:
OPENAI_API_KEYfor the Repair Agent to use the LLM (when no rule is in cache).
The III Engine is the runtime that executes Motia. Install it on your system:
curl -fsSL https://install.iii.dev/iii/main/install.sh | shVerify the installation:
iii -vClone the repo (if applicable) and from the project root:
With uv (recommended):
uv sync
# With dev dependencies: uv sync --extra devWith pip and venv:
python3 -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -e ".[dev]"
# or: pip install motia pydantic openai pytest pytest-asyncio httpx python-dotenv- Secrets: In a
.envfile at the project root (do not commit). Copy:cp .env.example .envand fill inOPENAI_API_KEY, etc. Loaded withpython-dotenvon startup. - Behaviour:
sentinel-config.yamlat the root (or path inSENTINEL_CONFIG_PATH). It defines: state/stream names, repair rule TTL, Healing Agent model and temperature, schema fields for inferring rules. See sentinel-config.yaml and.env.example.
- Copy the template:
cp .env.example .env - Edit
.envand fill in the values (e.g.OPENAI_API_KEY=sk-...). - Optional overrides:
OPENAI_REMEDIATION_MODEL,REPAIR_RULE_TTL_DAYSoverride values fromsentinel-config.yaml.SENTINEL_CONFIG_PATHpoints to a different config file.
The .env file is in .gitignore.
├── contracts/v1/ # Schema Registry (Pydantic)
│ ├── event.py # BaseEvent, EventEnvelope (envelope with version)
│ ├── transaction.py # TransactionSchema
│ ├── order.py # OrderItem, OrderPayload, OrderEvent (Envelope+Payload)
│ ├── repair_rule.py # RepairRule, StoredRepairRule (rules with TTL)
│ └── schema_factory.py # Factory by version (OrderEvent v1/v2, Transaction)
├── sentinel-config.yaml # Pipeline configuration (state, healing_agent, etc.)
├── .env.example # Variable template (copy to .env; do not commit .env)
├── src/ # Motia steps (auto-discovered)
│ ├── config.py # Loads .env (sensitive data)
│ ├── settings.py # Loads sentinel-config.yaml, validates, exposes settings
│ ├── repair_state.py # CacheProvider: get/set rules, idempotent apply, TTL
│ ├── ingestor_step.py # POST /ingest → raw_event
│ ├── validator_step.py # raw_event → validated_data | schema_fixed (cache) | validation_error
│ ├── healing_agent_step.py # validation_error → schema_fixed | DLQ
│ ├── loader_step.py # validated_data + schema_fixed → DWH
│ ├── order_payment_step.py # order_event → validated OrderEvent → order_payment_processed
│ └── dlq_step.py # validation_unrecoverable → log
├── test_sentinel.py # Contract breach simulation (CI/CD)
├── tests/
│ ├── unit/ # Pydantic models
│ ├── integration/ # Flow via API (requires server)
│ └── scenarios/ # Happy path, schema drift, data corruption
├── pyproject.toml
└── README.md
From the project root, start the III engine with the repo config. This brings up the API, queues, state, and the process that runs the Motia steps (src/*_step.py):
iii -c iii-config.yaml- The ingest endpoint is at
POST http://localhost:3111/ingest(port configurable viaSENTINEL_API_PORTiniii-config.yaml; default 3111). - Optional env vars for III:
SENTINEL_API_PORT,STREAM_PORT,SENTINEL_DATA_DIR(see comments iniii-config.yaml). - Steps are auto-discovered from
src/; theExecModulerunsuv run motia run --dir src. If you don't use uv, editiii-config.yamland change topython -m motia run --dir src.
The Workbench is the visual console to view flows, logs, and test endpoints. With the backend already running (iii -c iii-config.yaml), in another terminal run:
iii-console --enable-flowThen open in your browser:
- Workbench: http://localhost:3113/
There you can see the event graph (Flow View), logs for each step, and test ingest from the UI. Changes in src/**/*.py reload automatically thanks to the ExecModule watch mode.
Flow View: For the data-sentinel flow to appear in the Flow tab, this project patches the Motia runtime (the Python SDK does not send flow metadata to the engine; see iii-hq/iii#1206). If the Flow view is empty or after a uv sync you no longer see the flow, run:
uv run python scripts/patch_motia_flow_metadata.pyFor order events the Envelope + Body structure is used:
- EventEnvelope (
contracts/v1/event.py):event_id,timestamp,version,source. The Validator can inspectversionbefore parsing the payload (e.g. support v1 and v2 in parallel). - OrderEvent (
contracts/v1/order.py): extends the Envelope and addspayload: OrderPayload(order_id, customer_email, items, total_amount, currency). All order ingestion must validate withOrderEvent; on failure, tag as UNPROCESSED_DLQ.
In a step, validate at the input to avoid processing invalid data:
from contracts.v1.order import OrderEvent
async def handler(event_data: dict, ctx: FlowContext) -> None:
order = OrderEvent.model_validate(event_data) # fails here if the contract is not met
# ... use order.payload.order_id, order.payload.customer_email, etc.The process_order_payment step listens to the order_event queue and performs this validation before processing.
curl -X POST http://localhost:3111/ingest \
-H "Content-Type: application/json" \
-d '{
"source": "stripe",
"amount": "99.99",
"currency": "USD",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"email": "user@example.com",
"description": "Test"
}'Add event_id and timestamp if your contract requires them in the body; otherwise the Validator can use those injected by the Ingestor in the envelope.
To show the team the system's resilience, run the script that simulates malformed data:
python test_sentinel.pyThe script sends an event with invalid customer_email; the validation_gateway rejects it, errors are captured with e.errors() (field and type), and the auto-repair flow is simulated. Ideal for CI/CD to verify that the Sentinel catches and labels failures correctly.
- Isolation: the gateway only answers "Does it meet the contract? Yes/No".
- Error capture: Pydantic error details feed the AI Agent for repair or DLQ.
- Chaining: the flow does not crash; it is diverted in a controlled way to repair or DLQ.
# Unit + scenarios only (no server required)
pytest -m "not integration" -v
# With coverage
pytest -m "not integration" --cov=contracts -v
# Integration (Motia/III server running)
export SENTINEL_INGEST_URL=http://localhost:3111/ingest
pytest -m integration -vScenarios include OrderEvent contract breach (test_sentinel_order_event_contract_rupture_invalid_email); run them in CI to validate resilience.
| Scenario | Description | Check |
|---|---|---|
| Happy path | Valid JSON | Ingestor → Validator → validated_data → Loader |
| Schema drift | Renamed field (e.g. price→amount) | validation_error → Healing Agent → schema_fixed (or DLQ) |
| Data corruption | Invalid data (e.g. string in amount) | validation_error → DLQ if unrecoverable |
- Secrets in
.env:OPENAI_API_KEY(required for the Healing Agent to call the LLM when there is no rule in cache). - Behaviour in
sentinel-config.yaml(sectionhealing_agent):openai_remediation_model,temperature,target_fields(schema fields used to infer repair rules). You can override model and TTL withOPENAI_REMEDIATION_MODELandREPAIR_RULE_TTL_DAYSin.env.
The AI Step uses a strict system prompt focused on schemas and structural repairs only:
- Zero Hallucination: only structural repairs (rename keys, cast types, split strings). Never invent or fill business values.
- Contract Adherence: output must match the target JSON schema (Pydantic) passed to it.
- Format: the AI returns valid JSON only; no markdown, no prose. So the pipeline can call
json.loads()directly. - Unrecoverable: if repair is ambiguous or requires business logic (e.g. computing a missing total), the AI returns
{"reason": "..."}and the Sentinel sends the event to DLQ instead of inventing data.
The prompt receives: raw_payload, error_details (from Pydantic), and target_schema (from TransactionSchema.model_json_schema()), so the model knows exactly what structure to return. Full definition in src/healing_agent_step.py (constant MASTER_PROMPT).
Look-up → Decide → Learn. Calling the LLM for every event is unsustainable; State turns repair (slow/expensive) into in-memory read (fast/cheap).
- Look-up: On validation failure, the Validator computes an error signature and queries State (
repair_rules). If a rule exists and is not expired, it applies it. - Decide: If rule exists → apply (idempotent) and re-validate → emit
schema_fixed. If not → emitvalidation_errorto the Healing Agent. - Learn: When the AI repairs successfully, the Agent persists one or more
RepairRule(source_field, target_field, transformation_type) in State with TTL. From then on, the same error is resolved in milliseconds without spending tokens.
Best practices:
- TTL (Time To Live): Rules have
expires_at. Environment variableREPAIR_RULE_TTL_DAYS(default 7). After expiry, the AI can re-evaluate (third-party APIs may have changed again). - Idempotency: When applying a rule, if the target field already exists in the payload it is not overwritten. Do not apply the same transformation twice to the same object.
- Separation of concerns: The Validator orchestrates; it delegates to
repair_state(CacheProvider) or the Healing Agent (AIProvider). Do not mix "known repair" logic with "discovery via AI".
- III Workbench: Event graph and logs per step.
- Feedback loop: Periodically review the DLQ and Agent logs to adjust contracts in
contracts/v1and repair rules.
- Pipeline idempotency: The Loader is idempotent: it uses the Envelope's
event_idto check in State whether the event was already processed. On retries due to network failure, records are not duplicated in the DWH. State key:loader_processedbyevent_id(orrequest_idif noevent_id). - Repaired JSON validation: In the Healing Agent, the JSON returned by the AI is always re-validated against the target schema (
model_validate) before emittingschema_fixed. If re-validation fails, it is logged and the event goes to DLQ; invalid data is not emitted. - Contract evolution (Schema Factory): The Envelope's
versionfield allows supporting multiple contract versions. Incontracts/v1/schema_factory.py:get_order_event_model(version)andget_transaction_model(version)return the correct class (e.g.OrderEventfor"1.0.0"; when addingOrderEventV2for"2.0.0", register it inORDER_EVENT_REGISTRYand the Validator/Healing Agent can instantiate the right class without changing pipeline logic).
- DLQ (Dead Letter Queue): If the AI fails 3 times (or N as configured) to repair data, move that message to a "human errors" store or queue. Do not delete failed data; it allows auditing and contract improvement.
- Structured logging: Store logs in a format consumable by Datadog, ELK, or the Motia Workbench. Be able to filter by
status: "failed"to see which sources are unstable. - Regression testing: Every contract change (e.g.
OrderEvent) should be accompanied by runningtest_sentinel.pyand the scenarios with the previous JSON, to verify backward compatibility or plan migration.
- Deploy the project on the infrastructure where Motia/III runs (according to your CI/CD pipeline).
- Steps are discovered from
src/*_step.py; no need to register them manually with the orchestrator. - Configure queues and retries in III's
iii-config.yamlif applicable.
Apache-2.0