Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions a2a_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,14 @@ async def _deliver_webhook(record: TaskRecord, push_config: PushNotificationConf
try:
resp = await client.post(push_config.url, json=payload, headers=headers)
if resp.status_code < 500:
logger.debug("[a2a] webhook delivered → %s (%s)", push_config.url, resp.status_code)
# Upgraded from DEBUG → INFO (quinn#61). The absence of
# this line was the main reason we couldn't tell whether
# webhook delivery was firing at all — the whole path ran
# under the default WARNING threshold.
logger.info(
"[a2a] webhook delivered task=%s state=%s → %s (%s)",
record.id, record.state, push_config.url, resp.status_code,
)
Comment on lines +663 to +665
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging full webhook URLs at INFO (secret leakage risk).

These logs include full callback URLs. If operators use query tokens or userinfo in URLs, secrets get persisted in logs.

Proposed fix (sanitize URL before logging)
+from urllib.parse import urlsplit, urlunsplit
+
+def _log_safe_url(raw_url: str) -> str:
+    try:
+        parsed = urlsplit(raw_url)
+        host = parsed.hostname or ""
+        port = f":{parsed.port}" if parsed.port else ""
+        netloc = f"{host}{port}"
+        # Drop query/fragment and any userinfo
+        return urlunsplit((parsed.scheme, netloc, parsed.path, "", ""))
+    except Exception:
+        return "<invalid-url>"
+
@@
-                    logger.info(
-                        "[a2a] webhook delivered task=%s state=%s → %s (%s)",
-                        record.id, record.state, push_config.url, resp.status_code,
-                    )
+                    logger.info(
+                        "[a2a] webhook delivered task=%s state=%s → %s (%s)",
+                        record.id, record.state, _log_safe_url(push_config.url), resp.status_code,
+                    )
@@
-            logger.info(
-                "[a2a] push config registered (jsonrpc) task=%s state=%s → %s",
-                task_id, record.state, cfg.url,
-            )
+            logger.info(
+                "[a2a] push config registered (jsonrpc) task=%s state=%s → %s",
+                task_id, record.state, _log_safe_url(cfg.url),
+            )

Also applies to: 1215-1217

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@a2a_handler.py` around lines 663 - 665, The log statement in a2a_handler that
prints push_config.url (used with record.id, record.state, resp.status_code)
exposes full webhook URLs and may leak secrets; replace direct URL logging with
a sanitized form before calling processLogger.info: implement or reuse a helper
(e.g., sanitize_url or mask_sensitive_url) that strips userinfo and query
parameters (or replaces them with placeholders) and use that sanitized value in
the "[a2a] webhook delivered..." log; apply the same fix to the other identical
logging site referenced (the second occurrence around the push/response
logging).

return
logger.warning("[a2a] webhook 5xx (attempt %d): %s", attempt + 1, resp.status_code)
except httpx.RequestError as exc:
Expand Down Expand Up @@ -1114,12 +1121,13 @@ def _rpc_result(result):

if method == "message/send":
record = await _submit_task(text, context_id, push_config, caller_trace)
return _rpc_result({
"id": record.id,
"contextId": record.context_id,
"status": {"state": SUBMITTED,
"timestamp": record.created_at},
})
# Use _task_to_response so the `kind: "task"` discriminator
# and every other Task field land consistently with the
# SSE / REST / tasks/get paths. Building the dict inline
# omitted `kind`, which `@a2a-js/sdk` uses to route the
# result into its Task handler (spec-compliance with the
# a2a-streaming guide).
return _rpc_result(_task_to_response(record))

# streaming path — SSE frames wrapped in JSON-RPC envelopes
return StreamingResponse(
Expand Down Expand Up @@ -1199,8 +1207,15 @@ def _rpc_result(result):
)
async with _store._lock:
record.push_config = cfg
# Fire immediately if the task already reached terminal state
# before the caller got around to registering — otherwise the
# webhook would never be delivered (quinn#61).
if record.state in _TERMINAL:
await _push(record)
logger.info(
"[a2a] push config registered (jsonrpc) task=%s state=%s → %s",
task_id, record.state, cfg.url,
)
return _rpc_result({
"taskId": task_id,
"pushNotificationConfig": {"url": cfg.url, "id": cfg.id},
Expand Down
11 changes: 11 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@
# otherwise block anyone from importing tiny helpers (e.g. _build_agent_card)
# out of this module for tests. Keep the import inside _main().

# Root-level log config. Python's default is WARNING, which silently filtered
# every `logger.info(...)` call — including "push config registered" and
# "webhook delivered" lines from a2a_handler, making every diagnosis of the
# A2A/webhook path a guess-and-check session (quinn#61). INFO is the right
# default for a production agent; LOG_LEVEL env var lets operators tune up
# or down without a code change.
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
Comment on lines +33 to +35
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify current usage in repo
rg -n 'LOG_LEVEL|basicConfig\(' server.py

# Verify stdlib behavior for valid/invalid level names
python - <<'PY'
import logging

for value in ["INFO", "DEBUG", "INF0", "NOT_A_LEVEL"]:
    try:
        logging.basicConfig(level=value, force=True)
        print(f"{value}: OK")
    except Exception as e:
        print(f"{value}: ERROR -> {type(e).__name__}: {e}")
PY

Repository: protoLabsAI/quinn

Length of output: 351


Validate LOG_LEVEL before passing it to basicConfig.

At line 34, an invalid env value (e.g., LOG_LEVEL=INF0) raises ValueError and prevents server startup at import time. The logging.basicConfig() function does not gracefully handle invalid level names.

Proposed fix
+_raw_log_level = os.environ.get("LOG_LEVEL", "INFO")
+_resolved_log_level = getattr(logging, _raw_log_level.upper(), None)
+if not isinstance(_resolved_log_level, int):
+    _resolved_log_level = logging.INFO
+
 logging.basicConfig(
-    level=os.environ.get("LOG_LEVEL", "INFO").upper(),
+    level=_resolved_log_level,
     format="%(asctime)s %(levelname)s %(name)s %(message)s",
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server.py` around lines 33 - 35, The LOG_LEVEL environment value should be
validated before calling logging.basicConfig to avoid a ValueError at import;
read os.environ.get("LOG_LEVEL", "INFO"), upper-case it, use
logging.getLevelName() to check whether it maps to a numeric level (valid), and
if not fall back to "INFO" (and optionally log a warning) and then pass the
validated string to logging.basicConfig (referencing logging.basicConfig and the
LOG_LEVEL env lookup).

)

# Module-level logger — used to surface uncaught exceptions in the LangGraph
# stream/invoke code with a full traceback. Without this the A2A handler only
# sees str(e), which drops the frame location and makes every runtime bug a
Expand Down
35 changes: 35 additions & 0 deletions tests/test_a2a_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,36 @@ async def test_webhook_delivery_success():
assert payload["artifact"]["artifactId"] == "test-task-id"


@pytest.mark.asyncio
async def test_webhook_success_is_logged_at_info_level(caplog):
"""Quinn #61 root cause: successful webhook delivery used to log at
DEBUG, which is below the default WARNING threshold the server runs
at. There was no way to tell from docker logs whether delivery was
actually firing. Upgrade locked in — every successful POST must
produce an INFO line carrying the task id, final state, and URL."""
import logging
record = _make_record(state=COMPLETED)
cfg = PushNotificationConfig(url="https://example.com/hook")

with patch("a2a_handler.httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=None)
mock_response = MagicMock()
mock_response.status_code = 200
mock_client.post = AsyncMock(return_value=mock_response)
mock_client_cls.return_value = mock_client

with caplog.at_level(logging.INFO, logger="a2a_handler"):
await _deliver_webhook(record, cfg)

info_records = [r for r in caplog.records if r.levelno == logging.INFO]
assert any("webhook delivered" in r.getMessage() for r in info_records), (
"successful delivery must log at INFO, not DEBUG — otherwise the "
"default WARNING log level silently hides whether delivery is working"
)
Comment on lines +686 to +690
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Lock the full log contract, not just the prefix.

This test currently passes even if task/state/URL details regress. Please assert those required fields explicitly.

Proposed test hardening
-    info_records = [r for r in caplog.records if r.levelno == logging.INFO]
-    assert any("webhook delivered" in r.getMessage() for r in info_records), (
+    info_messages = [r.getMessage() for r in caplog.records if r.levelno == logging.INFO]
+    assert any(
+        "webhook delivered" in m
+        and "task=test-task-id" in m
+        and f"state={COMPLETED}" in m
+        and "https://example.com/hook" in m
+        for m in info_messages
+    ), (
         "successful delivery must log at INFO, not DEBUG — otherwise the "
         "default WARNING log level silently hides whether delivery is working"
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_a2a_handler.py` around lines 686 - 690, The test currently only
checks the "webhook delivered" prefix, which lets regressions in task/state/URL
slip through; update the assertion that examines info_records (from
caplog.records at logging.INFO) to assert that at least one record's
r.getMessage() contains the full required fields: the task identifier, the state
value, and the target URL (in addition to "webhook delivered"). Locate the check
that builds info_records and replace the broad any("webhook delivered" in
r.getMessage()) with an explicit check that parses or searches r.getMessage()
for the task id, state, and URL substrings (or compares against the exact
expected message) so the test fails if any of those fields are missing.



@pytest.mark.asyncio
async def test_webhook_delivery_no_token():
record = _make_record(state=FAILED, error_message="oops")
Expand Down Expand Up @@ -1003,8 +1033,13 @@ async def test_message_send_returns_submitted():

assert resp.status_code == 200
data = resp.json()
# Spec-compliance: the result must be a full Task object with the
# `kind` discriminator — @a2a-js/sdk routes by kind. Inline-dict
# builds have omitted it in the past (quinn#61 investigation).
assert data["result"]["kind"] == "task"
assert data["result"]["status"]["state"] == SUBMITTED
assert "id" in data["result"]
assert "contextId" in data["result"]


@pytest.mark.asyncio
Expand Down
Loading