Skip to content

add transform, embed, moderate, delay step types #121

@cchinchilla-dev

Description

@cchinchilla-dev

Description

The current StepType enum (core/models.py:11-18) has five values: LLM_CALL, TOOL, ROUTER, SUBWORKFLOW, APPROVAL_GATE. With the additions tracked in #38 (loop), #39 (evaluator), #43 (map / reduce), and #118 (embed), there are still gaps that force users to write either custom tools or contorted YAML for routine operations:

  • transform — pure data transformation without an LLM call. JSON parse, regex extract, jmespath query, base64 decode, CSV parse, XML parse, list slicing. Today every transformation requires either a custom Python tool or chaining a router + state mutations. Belongs as a first-class step.
  • moderate — content moderation. OpenAI moderation endpoint, Azure Content Safety, AWS Comprehend toxicity. Critical for any workflow producing user-facing output (the AgentTest evaluator's safety metric in particular). Should be a primitive, not a tool.
  • delay — explicit timed wait. Useful for polling external systems, rate-limit-aware pacing in batch workflows, simulating real-time conversation latencies in the Simulator. await anyio.sleep(...) should be a step, not require a custom tool.
  • embed — embedding generation, depending on add embeddings API across providers #118. Listed here for completeness; details in add embeddings API across providers #118.

Proposal

Add four new step types. Each is small, self-contained, and orthogonal — they can ship independently if scope-trimming is needed.

1. transform step:

- id: parse_response
  type: transform
  source: state.llm_output      # or inline value
  ops:
    - type: json_parse
    - type: jmespath
      query: "results[?score > `0.8`].title"
  output: state.high_confidence_titles

Built-in ops:

  • json_parse, json_serialize
  • regex_extract (with pattern and optional group)
  • regex_replace (with pattern, replacement)
  • jmespath (query language for JSON)
  • xml_parse, csv_parse
  • base64_encode, base64_decode
  • lower, upper, strip, truncate(max_chars)
  • list_slice(start, end), list_filter(predicate), list_map(template)

Ops are composable in order. Each op operates on the output of the previous. The whole chain is pure-Python, no LLM, no I/O — fast, cheap, deterministic.

2. moderate step:

- id: check_response
  type: moderate
  input: state.assistant_reply
  provider: openai              # or azure / aws
  categories: [hate, sexual, violence, self-harm]
  threshold: 0.5
  on_violation: fail            # or "warn" / "redact"
  output: state.moderation_result

Output shape:

class ModerationResult(BaseModel):
    flagged: bool
    categories: dict[str, float]   # category -> score
    violations: list[str]          # categories above threshold

If on_violation: fail and any category exceeds threshold, the step fails (workflow follows error semantics). If redact, the input is replaced with a redaction marker before being written to state.

3. delay step:

- id: wait_for_settlement
  type: delay
  seconds: 30                   # or jitter: { min: 25, max: 35 }

Trivial wrapper over await anyio.sleep(seconds). Cancellable per the engine's normal cancellation semantics.

4. embed step (per #118):

- id: vectorize
  type: embed
  inputs: state.documents
  model: text-embedding-3-small
  output: state.vectors

Scope

  • src/agentloom/core/models.py — extend StepType enum; per-step config models.
  • src/agentloom/steps/transform.py — new TransformStep with the op registry.
  • src/agentloom/steps/moderate.py — new ModerateStep; provider adapters for OpenAI moderation, Azure Content Safety, AWS Comprehend.
  • src/agentloom/steps/delay.py — new DelayStep.
  • src/agentloom/steps/embed.py — new EmbedStep (after add embeddings API across providers #118).
  • src/agentloom/steps/registry.py — register the new types.
  • examples/ — one example per new step.

Regression tests

  • test_transform_jmespath_extracts_correctly
  • test_transform_op_chain_composes_in_order
  • test_transform_unknown_op_raises_at_validation
  • test_moderate_openai_flags_violation_above_threshold
  • test_moderate_redact_replaces_input
  • test_moderate_fail_aborts_workflow
  • test_delay_blocks_for_specified_duration
  • test_delay_cancellable_via_workflow_cancel (after cancellation API exists)
  • test_embed_step_writes_vectors_to_state (covered in add embeddings API across providers #118 if it lands first)

Notes

  • All four are independent — ship in any order. embed blocks on add embeddings API across providers #118; the others have no prereqs.
  • Moderation provider adapters: start with OpenAI (free, easy). Azure Content Safety and AWS Comprehend are follow-up.
  • The transform op registry is intentionally extensible — users can register custom ops via the same plugin mechanism that lives outside this issue (or via the existing tool decorator pattern).
  • delay may seem trivial but it's commonly requested for polling patterns; keeping it as a step makes the workflow's intent explicit and keeps timing visible in traces.

Metadata

Metadata

Assignees

No one assigned

    Labels

    coreCore engine, DAG, stateenhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions