Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions a2a_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@
# Ref: protoWorkstacean/docs/extensions/cost-v1.md
COST_MIME = "application/vnd.protolabs.cost-v1+json"

# Confidence-v1: self-reported confidence score + explanation on the terminal
# artifact. Workstacean's confidence interceptor reads
# result.data.confidence (clamped to [0, 1]) and optional
# result.data.confidenceExplanation, records a ConfidenceSample, and
# publishes autonomous.confidence.{agent}.{skill}. Planner L0 reads
# avgConfidenceOnSuccess alongside cost for candidate ranking.
# Schema: {"confidence": float, "confidenceExplanation": str?, "success": bool}
# Ref: protoWorkstacean/docs/extensions/confidence-v1.md
CONFIDENCE_MIME = "application/vnd.protolabs.confidence-v1+json"

# ── Data types ────────────────────────────────────────────────────────────────


Expand Down Expand Up @@ -117,6 +127,12 @@ class TaskRecord:
# cost interceptor (protoWorkstacean#372) can record per-skill samples.
# Shape: {"input_tokens": int, "output_tokens": int, "total_tokens": int}
usage: dict = field(default_factory=lambda: {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
# Self-reported confidence for the confidence-v1 DataPart. Set by the
# producer when it parses a <confidence> tag out of the model's final
# output. Clamped to [0, 1] on write; None when the model didn't
# report one (the interceptor no-ops in that case).
confidence: float | None = None
confidence_explanation: str | None = None
# ── asyncio primitives (not serialised) ──
_cancel_event: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
_update_event: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
Expand Down Expand Up @@ -234,6 +250,32 @@ async def add_usage(self, task_id: str, input_tokens: int, output_tokens: int) -
record.usage["input_tokens"] + record.usage["output_tokens"]
)

async def set_confidence(
self,
task_id: str,
confidence: float,
explanation: str | None = None,
) -> None:
"""Record the agent's self-reported confidence for this task.

Called once from the producer when it parses a <confidence> tag
out of the model's final output. Emitted on the terminal artifact
under the confidence-v1 MIME so Workstacean's confidence
interceptor can record per-skill samples.

Confidence is clamped to [0, 1] defensively — the workstacean-side
interceptor also clamps, but we do it here too so the emitted
DataPart is always in-spec.
"""
clamped = max(0.0, min(1.0, float(confidence)))
async with self._lock:
record = self._tasks.get(task_id)
if record is None:
return
record.confidence = clamped
if explanation and isinstance(explanation, str):
record.confidence_explanation = explanation.strip() or None

async def cancel_if_not_terminal(self, task_id: str) -> TaskRecord | None:
"""Atomically cancel a task iff it's not already terminal.

Expand Down Expand Up @@ -348,9 +390,37 @@ def _terminal_artifact_parts(record: TaskRecord) -> list[dict]:
"data": cost_data,
"metadata": {"mimeType": COST_MIME},
})
confidence_data = _confidence_payload(record)
if confidence_data is not None:
parts.append({
"kind": "data",
"data": confidence_data,
"metadata": {"mimeType": CONFIDENCE_MIME},
})
return parts


def _confidence_payload(record: TaskRecord) -> dict | None:
"""Build the confidence-v1 payload for a terminal record, or None if the
agent didn't self-report a confidence score this run.

``success`` is derived from the terminal state — COMPLETED is the only
truthy case (CANCELED and FAILED both count as not-a-success for
OutcomeAnalysis's purposes). The interceptor pairs confidence with
success when recording samples; reporting a confidence on a FAILED run
is exactly the "high-confidence failure" calibration signal.
"""
if record.confidence is None:
return None
payload: dict = {
"confidence": record.confidence,
"success": record.state == COMPLETED,
}
if record.confidence_explanation:
payload["confidenceExplanation"] = record.confidence_explanation
return payload


def _cost_payload(record: TaskRecord) -> dict | None:
"""Build the cost-v1 payload for a terminal record, or None if no
cost-relevant data is available.
Expand Down Expand Up @@ -786,6 +856,22 @@ async def _run_task_background(
output_tokens=payload.get("output_tokens", 0),
)

elif event_type == "confidence":
# Self-reported confidence parsed from the model's final
# output. Stored on the record and emitted on the terminal
# artifact under the confidence-v1 MIME for Workstacean's
# confidence interceptor.
if isinstance(payload, dict) and "confidence" in payload:
try:
await _store.set_confidence(
task_id,
confidence=float(payload["confidence"]),
explanation=payload.get("explanation"),
)
except (TypeError, ValueError):
# Bad payload — skip rather than crash the run.
pass

elif event_type == "done":
record = await _store.update_state(
task_id,
Expand Down
55 changes: 52 additions & 3 deletions graph/output_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,28 @@
The user-facing answer. This is what lands in the A2A artifact /
Discord / Gradio chat. Be clean, scannable, markdown-formatted.
</output>
<confidence>0.85</confidence>
<confidence_explanation>
One short sentence on why this score — what made you sure or unsure.
</confidence_explanation>

Rules:
- Always emit both tags, in that order, exactly once.
- Never include literal `<scratch_pad>` or `<output>` markers inside the
user-facing content.
- Always emit `<scratch_pad>` and `<output>`, in that order, exactly once.
- Never include literal `<scratch_pad>` / `<output>` / `<confidence>` /
`<confidence_explanation>` markers inside the user-facing content.
- Keep tool-calling deliberation in `<scratch_pad>`. Keep only the
finished, customer-ready answer in `<output>`.
- If you must defer or ask for clarification, put the question inside
`<output>` too — the user never sees `<scratch_pad>`.

Confidence (required on terminal responses):
- `<confidence>` is a number in [0, 1] — your self-assessed confidence
that the `<output>` is correct and complete. Calibrate honestly: a
0.9 should mean you'd bet on it; a 0.5 means roughly a coin flip.
- `<confidence_explanation>` is one short sentence on what drove the
score — spec clarity, tool-result completeness, edge cases unchecked.
- Omit both tags when you're only calling tools (no final answer yet).
Include them once, on the turn that contains the final `<output>`.
""".strip()


Expand All @@ -59,6 +72,16 @@
_THINK_RE = re.compile(r"<think>[\s\S]*?</think>", re.IGNORECASE)
_ORPHAN_THINK_OPEN_RE = re.compile(r"<think>[\s\S]*$", re.IGNORECASE)
_ORPHAN_THINK_CLOSE_RE = re.compile(r"</think>\s*", re.IGNORECASE)
_CONFIDENCE_RE = re.compile(
r"<confidence>\s*(-?[\d.]+)\s*</confidence>", re.IGNORECASE,
)
_CONFIDENCE_EXPLANATION_RE = re.compile(
r"<confidence_explanation>([\s\S]*?)</confidence_explanation>", re.IGNORECASE,
)
_CONFIDENCE_ANY_RE = re.compile(
r"<confidence(?:_explanation)?>[\s\S]*?</confidence(?:_explanation)?>",
re.IGNORECASE,
)


def _strip_reasoning(text: str) -> str:
Expand All @@ -73,9 +96,35 @@ def _strip_reasoning(text: str) -> str:
text = _ORPHAN_THINK_CLOSE_RE.sub("", text)
text = _SCRATCH_RE.sub("", text)
text = _ORPHAN_SCRATCH_OPEN_RE.sub("", text)
text = _CONFIDENCE_ANY_RE.sub("", text)
return text


def extract_confidence(text: str) -> tuple[float | None, str | None]:
"""Pull ``(confidence, explanation)`` out of a complete model response.

Returns ``(None, None)`` if the model didn't emit a `<confidence>` tag.
Clamps confidence to [0, 1]. Unparseable numbers return ``None`` so
``_chat_langgraph_stream`` emits no confidence event — the workstacean
interceptor no-ops on missing confidence, which is the correct
fallback for a malformed self-report.
"""
m = _CONFIDENCE_RE.search(text)
if not m:
return None, None
try:
value = float(m.group(1))
except ValueError:
return None, None
value = max(0.0, min(1.0, value))
explanation_m = _CONFIDENCE_EXPLANATION_RE.search(text)
explanation = None
if explanation_m:
cleaned = explanation_m.group(1).strip()
explanation = cleaned or None
return value, explanation


def extract_output(text: str) -> str:
"""Return the user-facing content from a complete model response.

Expand Down
63 changes: 62 additions & 1 deletion server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pathlib import Path
from typing import Any

from graph.output_format import extract_output
from graph.output_format import extract_confidence, extract_output

# chat_ui pulls in gradio, which the server needs at runtime but which would
# otherwise block anyone from importing tiny helpers (e.g. _build_agent_card)
Expand Down Expand Up @@ -476,6 +476,17 @@ async def _chat_langgraph_stream(message: str, session_id: str, *, caller_trace:
"output_tokens": int(usage.get("output_tokens", 0) or 0),
})

# Self-reported confidence, if the model emitted <confidence>
# tags. Pulled from the raw text BEFORE extract_output strips
# reasoning markers. Emitted ahead of "done" so the a2a handler
# stamps it on the terminal artifact.
confidence, explanation = extract_confidence(accumulated_raw)
if confidence is not None:
yield ("confidence", {
"confidence": confidence,
"explanation": explanation,
})

yield ("done", extract_output(accumulated_raw))

except GeneratorExit:
Expand Down Expand Up @@ -819,6 +830,56 @@ def _build_agent_card(host: str) -> dict:
{
"uri": "https://proto-labs.ai/a2a/ext/cost-v1",
},
# confidence-v1: Quinn emits a self-reported confidence score
# on the terminal artifact when the model includes
# <confidence>/<confidence_explanation> tags (see
# graph/output_format.py). Workstacean's confidence
# interceptor records per-(agent, skill) samples so planner
# ranking can weight by avgConfidenceOnSuccess and
# OutcomeAnalysis can flag high-confidence-failure clusters.
# Ref: docs/extensions/confidence-v1.md in protoWorkstacean.
{
"uri": "https://proto-labs.ai/a2a/ext/confidence-v1",
},
# blast-v1: per-skill scope of effect so HITL policy +
# planner can apply stricter gates to higher-impact work.
# Read-side only (no response payload). Radii here align
# with what Quinn actually does in each skill handler —
# don't over-declare, the planner uses this for tiebreaking.
#
# Ref: docs/extensions/blast-v1.md in protoWorkstacean.
{
"uri": "https://proto-labs.ai/a2a/ext/blast-v1",
"params": {
"skills": {
"qa_report": {"radius": "self"},
"board_audit": {"radius": "project"},
"pr_review": {"radius": "repo"},
"bug_triage": {"radius": "project"},
},
},
},
# hitl-mode-v1: per-skill approval policy. Composes with
# blast-v1 so higher-blast work can be gated independently
# of goal-level config. All Quinn skills are safe enough to
# run without blocking gates today — bug_triage files a
# backlog feature (reversible), board_audit + qa_report
# are read-only, pr_review posts review comments but never
# merges. Notification mode surfaces the action to the
# originating surface without blocking.
#
# Ref: docs/extensions/hitl-mode-v1.md in protoWorkstacean.
{
"uri": "https://proto-labs.ai/a2a/ext/hitl-mode-v1",
"params": {
"skills": {
"qa_report": {"mode": "autonomous"},
"board_audit": {"mode": "notification"},
"pr_review": {"mode": "notification"},
"bug_triage": {"mode": "notification"},
},
},
},
],
},
"defaultInputModes": ["text/plain"],
Expand Down
79 changes: 79 additions & 0 deletions tests/test_a2a_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,85 @@ async def test_store_add_usage_ignores_zero_payloads(store):
assert fetched.usage == {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}


# ── confidence-v1 ─────────────────────────────────────────────────────────────


def test_terminal_artifact_emits_confidence_v1_when_set():
"""When the producer set a confidence score on the record, the terminal
artifact carries a confidence-v1 DataPart with confidence + success +
optional explanation. Workstacean's confidence interceptor extracts
this onto result.data for per-(agent, skill) ConfidenceSamples."""
from a2a_handler import CONFIDENCE_MIME, _terminal_artifact_parts
record = _make_record(state=COMPLETED, accumulated_text="hi")
record.confidence = 0.82
record.confidence_explanation = "Spec unambiguous; all tests pass."
parts = _terminal_artifact_parts(record)
conf = next(
(p for p in parts if p.get("metadata", {}).get("mimeType") == CONFIDENCE_MIME),
None,
)
assert conf is not None, "confidence-v1 DataPart missing"
assert conf["data"]["confidence"] == 0.82
assert conf["data"]["confidenceExplanation"] == "Spec unambiguous; all tests pass."
assert conf["data"]["success"] is True


def test_terminal_artifact_confidence_success_is_false_on_failure():
"""FAILED and CANCELED terminal states must report success=False so the
interceptor can classify high-confidence failures — the whole point
of calibration tracking."""
from a2a_handler import CONFIDENCE_MIME, _terminal_artifact_parts
record = _make_record(state=FAILED, accumulated_text="")
record.confidence = 0.9
parts = _terminal_artifact_parts(record)
conf = next(
(p for p in parts if p.get("metadata", {}).get("mimeType") == CONFIDENCE_MIME),
None,
)
assert conf is not None
assert conf["data"]["success"] is False
# explanation omitted when not set — avoids empty string noise.
assert "confidenceExplanation" not in conf["data"]


def test_terminal_artifact_omits_confidence_v1_when_not_reported():
"""No <confidence> tag in the model output → no DataPart. The
interceptor no-ops on absent confidence, so emitting an empty payload
would just be noise."""
from a2a_handler import CONFIDENCE_MIME, _terminal_artifact_parts
record = _make_record(state=COMPLETED, accumulated_text="hi")
# confidence defaults to None
parts = _terminal_artifact_parts(record)
conf = next(
(p for p in parts if p.get("metadata", {}).get("mimeType") == CONFIDENCE_MIME),
None,
)
assert conf is None


@pytest.mark.asyncio
async def test_store_set_confidence_clamps_to_unit_interval(store):
"""Misbehaving models can emit 1.2 or -0.3. We clamp to [0, 1] here
too (the workstacean interceptor also clamps, but defence-in-depth
keeps the DataPart in-spec on the wire)."""
record = _make_record()
await store.create(record)
await store.set_confidence("test-task-id", confidence=1.5, explanation="over")
fetched = await store.get("test-task-id")
assert fetched.confidence == 1.0
assert fetched.confidence_explanation == "over"
await store.set_confidence("test-task-id", confidence=-0.2)
fetched = await store.get("test-task-id")
assert fetched.confidence == 0.0


@pytest.mark.asyncio
async def test_store_set_confidence_ignores_missing_task(store):
"""set_confidence on an unknown task id must no-op, not raise."""
await store.set_confidence("no-such-task", confidence=0.5)
# No exception = pass


def test_task_to_response_has_kind_discriminator():
"""A2A spec: Task objects carry kind='task'. This is what
message/send / tasks/get / initial stream frame all return."""
Expand Down
Loading
Loading