Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/deepscientist/artifact/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from .schemas import ARTIFACT_DIRS, guidance_for_kind, validate_artifact_payload

QUEST_COMPLETION_DECISION_TYPE = "quest_completion_approval"
_NOOP_HEARTBEAT_MARKERS = frozenset({"__noop__", "__mailbox_poll__"})
_COMPLETION_APPROVAL_TERMS = (
"同意完成",
"确认完成",
Expand Down Expand Up @@ -13024,6 +13025,52 @@ def interact(
suppress_resolved = (kind == "progress") if suppress_if_unchanged is None else bool(suppress_if_unchanged)
dedupe_key_resolved = str(dedupe_key or self._normalize_interaction_message(full_message)).strip() or None
pending_user_message_count = int(self.quest_service.snapshot(self._quest_id(quest_root)).get("pending_user_message_count") or 0)
normalized_message = self._normalize_interaction_message(full_message)
if (
kind == "progress"
and pending_user_message_count == 0
and normalized_message in _NOOP_HEARTBEAT_MARKERS
and self._has_unresolved_blocking_interaction(quest_root)
):
interaction_state = self._read_interaction_state(quest_root)
waiting_requests = [
dict(item)
for item in (interaction_state.get("open_requests") or [])
if str(item.get("status") or "") == "waiting"
]
return {
"status": "suppressed_blocking_pending",
"artifact_id": None,
"interaction_id": None,
"expects_reply": False,
"reply_mode": "threaded",
"surface_actions": [],
"connector_hints": connector_hints_resolved,
"normalized_attachments": attachments_resolved,
"attachment_issues": attachment_issues,
"delivered": False,
"delivery_results": [],
"response_phase": response_phase,
"delivery_targets": [],
"delivery_policy": self._delivery_policy(self._connectors_config()),
"preferred_connector": self._preferred_connector(self._connectors_config()),
"recent_inbound_messages": [],
"delivery_batch": None,
"recent_interaction_records": self.quest_service.latest_artifact_interaction_records(quest_root, limit=10),
"agent_instruction": self.quest_service.localized_copy(
quest_root=quest_root,
zh="当前已有一个等待用户回复的阻塞 interaction,再发心跳只会把它压到下方。请安静等待,不要再 record __noop__ progress。",
en="A blocking interaction is already waiting on the user. Recording another __noop__ heartbeat would only push it further down the feed. Stay quiet until the user replies.",
),
"queued_message_count_before_delivery": 0,
"queued_message_count_after_delivery": 0,
"open_request_count": len(waiting_requests),
"active_request": waiting_requests[-1] if waiting_requests else None,
"default_reply_interaction_id": interaction_state.get("default_reply_interaction_id"),
"guidance": "Noop heartbeat suppressed because a blocking interaction is still waiting for the user.",
"suppressed_reason": "blocking_pending",
"dedupe_key": dedupe_key_resolved,
}
if (
kind == "progress"
and suppress_resolved
Expand Down Expand Up @@ -13954,6 +14001,25 @@ def _interaction_decision_type(item: dict[str, Any]) -> str:
reply_schema = item.get("reply_schema") if isinstance(item.get("reply_schema"), dict) else {}
return str(reply_schema.get("decision_type") or "").strip()

def _has_unresolved_blocking_interaction(self, quest_root: Path) -> bool:
# An "unresolved blocking interaction" is any open_request that has
# not yet been answered (status='waiting'). Entries in open_requests
# are only created for blocking decision_request / approval calls in
# _update_interaction_state, so a waiting entry is by construction
# waiting on an explicit user reply — no further reply_mode check is
# needed (and indeed the reply_mode field is not copied into the
# open_requests record). Used to suppress noop progress heartbeats so
# the actual blocking artifact stays near the top of the user's feed
# instead of being pushed down by ack pings.
state = self._read_interaction_state(quest_root)
for item in state.get("open_requests") or []:
if not isinstance(item, dict):
continue
if str(item.get("status") or "").lower() != "waiting":
continue
return True
return False

def _latest_completion_request(self, quest_root: Path) -> dict[str, Any] | None:
state = self._read_interaction_state(quest_root)
candidates: list[dict[str, Any]] = []
Expand Down
72 changes: 72 additions & 0 deletions src/ui/src/lib/__tests__/findReplyTargetId.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { describe, expect, it } from 'vitest'

import { findReplyTargetId } from '@/lib/acp'
import type { FeedItem } from '@/types'

type ArtifactItem = Extract<FeedItem, { type: 'artifact' }>

function artifact(overrides: Partial<ArtifactItem>): ArtifactItem {
return {
id: 'art-default',
type: 'artifact',
kind: 'progress',
content: '',
...overrides,
}
}

describe('findReplyTargetId', () => {
it('prefers an unresolved blocking decision over more recent threaded heartbeats', () => {
// Reproduces quest 014: a blocking quest_completion_approval is followed
// by several `__noop__` progress heartbeats. Without the two-pass scan the
// most recent threaded heartbeat would steal the reply target and "同意"
// would never reach the actual approval request.
const feed: FeedItem[] = [
artifact({
id: 'decision-b5351a4f',
kind: 'decision',
interactionId: 'decision-b5351a4f',
replyMode: 'blocking',
expectsReply: true,
}),
artifact({ id: 'progress-1', interactionId: 'progress-1', replyMode: 'threaded' }),
artifact({ id: 'progress-2', interactionId: 'progress-2', replyMode: 'threaded' }),
artifact({ id: 'progress-3', interactionId: 'progress-3', replyMode: 'threaded' }),
]
expect(findReplyTargetId(feed)).toBe('decision-b5351a4f')
})

it('falls back to the latest threaded interaction when no blocking one is open', () => {
const feed: FeedItem[] = [
artifact({ id: 'progress-old', interactionId: 'progress-old', replyMode: 'threaded' }),
artifact({ id: 'progress-new', interactionId: 'progress-new', replyMode: 'threaded' }),
]
expect(findReplyTargetId(feed)).toBe('progress-new')
})

it('matches expectsReply even without an explicit blocking replyMode', () => {
const feed: FeedItem[] = [
artifact({
id: 'question',
interactionId: 'question',
expectsReply: true,
}),
artifact({ id: 'progress-after', interactionId: 'progress-after', replyMode: 'threaded' }),
]
expect(findReplyTargetId(feed)).toBe('question')
})

it('returns the interaction id when present, otherwise falls back to the item id', () => {
const feed: FeedItem[] = [
artifact({ id: 'art-id', interactionId: undefined, replyMode: 'blocking', expectsReply: true }),
]
expect(findReplyTargetId(feed)).toBe('art-id')
})

it('returns null when no artifacts are present', () => {
const feed: FeedItem[] = [
{ id: 'm', type: 'message', role: 'user', content: 'hi' },
]
expect(findReplyTargetId(feed)).toBeNull()
})
})
11 changes: 10 additions & 1 deletion src/ui/src/lib/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -815,13 +815,22 @@ function collectSealedAssistantRunIds(updates: Array<Record<string, unknown>>) {
return Array.from(sealed)
}

function findReplyTargetId(feed: FeedItem[]) {
export function findReplyTargetId(feed: FeedItem[]) {
// Prefer the most recent unresolved blocking interaction (decision_request,
// approval, blocking question) so that a free-text reply like "同意" lands on
// the artifact actually waiting for a user verdict — not on a `progress`
// heartbeat that happens to be more recent in the feed. Only when no such
// blocking interaction exists do we fall back to the latest threaded one.
for (let index = feed.length - 1; index >= 0; index -= 1) {
const item = feed[index]
if (item.type !== 'artifact') continue
if (item.replyMode === 'blocking' || item.expectsReply) {
return item.interactionId || item.id
}
}
for (let index = feed.length - 1; index >= 0; index -= 1) {
const item = feed[index]
if (item.type !== 'artifact') continue
if (item.replyMode === 'threaded' && item.interactionId) {
return item.interactionId
}
Expand Down
84 changes: 84 additions & 0 deletions tests/test_memory_and_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -5132,6 +5132,90 @@ def test_duplicate_progress_is_suppressed_when_message_is_unchanged(temp_home: P
assert len(outbound) == 1


def test_noop_progress_is_suppressed_when_blocking_interaction_is_pending(temp_home: Path) -> None:
# Reproduces quest 014's behavior: while a blocking
# quest_completion_approval is waiting on the user, every mailbox-poll
# cycle should NOT record another `__noop__` progress artifact (which
# would push the actual approval request further down the feed and
# produce a stream of cosmetic git commits / connector notifications).
ensure_home_layout(temp_home)
ConfigManager(temp_home).ensure_files()
quest_service = QuestService(temp_home, skill_installer=SkillInstaller(repo_root(), temp_home))
quest = quest_service.create(
"noop suppression while blocking quest",
startup_contract={"decision_policy": "autonomous"},
)
quest_root = Path(quest["quest_root"])
artifact = ArtifactService(temp_home)

request = artifact.interact(
quest_root,
kind="decision_request",
message="Quest looks complete. May I close it?",
deliver_to_bound_conversations=False,
include_recent_inbound_messages=False,
reply_mode="blocking",
reply_schema={"decision_type": "quest_completion_approval"},
)
assert request["status"] == "ok"
assert request["reply_mode"] == "blocking"
progress_dir = quest_root / "artifacts" / "progress"
progress_count_before = len(list(progress_dir.glob("*.json"))) if progress_dir.exists() else 0

suppressed = artifact.interact(
quest_root,
kind="progress",
message="__noop__",
deliver_to_bound_conversations=False,
include_recent_inbound_messages=True,
suppress_if_unchanged=True,
dedupe_key="mailbox-poll-only-test",
)
assert suppressed["status"] == "suppressed_blocking_pending"
assert suppressed["suppressed_reason"] == "blocking_pending"
assert suppressed["artifact_id"] is None
assert suppressed["open_request_count"] == 1
progress_count_after = len(list(progress_dir.glob("*.json"))) if progress_dir.exists() else 0
assert progress_count_after == progress_count_before


def test_substantive_progress_passes_through_when_blocking_interaction_is_pending(temp_home: Path) -> None:
# Sanity guard: only `__noop__` / `__mailbox_poll__` heartbeats are
# suppressed during a blocking wait — a substantive progress message
# (e.g. "training crashed at step 1200") must still record so the user
# learns the situation has changed.
ensure_home_layout(temp_home)
ConfigManager(temp_home).ensure_files()
quest_service = QuestService(temp_home, skill_installer=SkillInstaller(repo_root(), temp_home))
quest = quest_service.create(
"substantive progress while blocking quest",
startup_contract={"decision_policy": "autonomous"},
)
quest_root = Path(quest["quest_root"])
artifact = ArtifactService(temp_home)

request = artifact.interact(
quest_root,
kind="decision_request",
message="Quest looks complete. May I close it?",
deliver_to_bound_conversations=False,
include_recent_inbound_messages=False,
reply_mode="blocking",
reply_schema={"decision_type": "quest_completion_approval"},
)
assert request["status"] == "ok"

real_update = artifact.interact(
quest_root,
kind="progress",
message="Training crashed at step 1200; restarting from checkpoint.",
deliver_to_bound_conversations=False,
include_recent_inbound_messages=False,
)
assert real_update["status"] == "ok"
assert real_update["artifact_id"]


def test_interact_preserves_full_message_for_delivery_and_records_summary_preview(temp_home: Path) -> None:
ensure_home_layout(temp_home)
ConfigManager(temp_home).ensure_files()
Expand Down