From d0768086033144a073509900ea01f1cff6b1b423 Mon Sep 17 00:00:00 2001 From: kokorolx Date: Mon, 30 Mar 2026 04:04:13 +0000 Subject: [PATCH 1/8] fix(agent): keep tool defs on final iteration to prevent Anthropic HTTP 400 On the last iteration the loop set toolDefs = nil to force a text-only response, but the conversation history still contained tool_use/tool_result messages. Anthropic (and LiteLLM proxy) reject requests that include tool messages without a top-level tools parameter. Instead of stripping tools, keep them and set tool_choice="none" so the API enforces text-only at the protocol level. Added ToolChoice field to ChatRequest and wired it through both OpenAI-compat and Anthropic providers. Also removed temperature from thinking-enabled requests to avoid proxy 400 errors (Anthropic requires temperature=1 default with extended thinking). --- internal/agent/loop.go | 12 ++++++------ internal/agent/loop_tool_filter.go | 15 ++++++++------- internal/providers/anthropic_request.go | 8 ++++++++ internal/providers/openai.go | 13 +++++++++++-- internal/providers/types.go | 9 +++++---- 5 files changed, 38 insertions(+), 19 deletions(-) diff --git a/internal/agent/loop.go b/internal/agent/loop.go index 1b61823a4..4a7d86023 100644 --- a/internal/agent/loop.go +++ b/internal/agent/loop.go @@ -222,8 +222,6 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult, Payload: map[string]any{"phase": "thinking", "iteration": rs.iteration}, }) - // Build per-iteration tool list: policy, tenant exclusions, bootstrap, skill visibility, - // channel type, and final-iteration stripping. var toolDefs []providers.ToolDefinition var allowedTools map[string]bool // Resolve per-user MCP tools (servers requiring user credentials). @@ -231,7 +229,8 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult, if req.UserID != "" { l.getUserMCPTools(iterCtx, req.UserID) } - toolDefs, allowedTools, messages = l.buildFilteredTools(&req, hadBootstrap, rs.iteration, maxIter, messages) + var toolChoice string + toolDefs, allowedTools, messages, toolChoice = l.buildFilteredTools(&req, hadBootstrap, rs.iteration, maxIter, messages) // Use per-request overrides if set (e.g. heartbeat uses cheaper provider/model). model := l.model @@ -244,9 +243,10 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult, } chatReq := providers.ChatRequest{ - Messages: messages, - Tools: toolDefs, - Model: model, + Messages: messages, + Tools: toolDefs, + ToolChoice: toolChoice, + Model: model, Options: map[string]any{ providers.OptMaxTokens: l.effectiveMaxTokens(), providers.OptTemperature: config.DefaultTemperature, diff --git a/internal/agent/loop_tool_filter.go b/internal/agent/loop_tool_filter.go index cd0ac76cd..a4778e9a4 100644 --- a/internal/agent/loop_tool_filter.go +++ b/internal/agent/loop_tool_filter.go @@ -12,9 +12,9 @@ import ( // disabled tools, bootstrap mode, skill visibility, channel type, and iteration budget. // Per-user MCP tools must be registered in the Registry before calling this function // (via getUserMCPTools) so they are included in policy filtering and execution. -// Returns tool definitions for the provider, an allowed-tools map for execution validation, -// and the (potentially modified) messages slice when final-iteration stripping appends a hint. -func (l *Loop) buildFilteredTools(req *RunRequest, hadBootstrap bool, iteration, maxIter int, messages []providers.Message) ([]providers.ToolDefinition, map[string]bool, []providers.Message) { +// Returns tool definitions, an allowed-tools map, the (potentially modified) messages slice, +// and a tool_choice override ("" = auto, "none" = no tool calls allowed). +func (l *Loop) buildFilteredTools(req *RunRequest, hadBootstrap bool, iteration, maxIter int, messages []providers.Message) ([]providers.ToolDefinition, map[string]bool, []providers.Message, string) { // Build provider request with policy-filtered tools. var toolDefs []providers.ToolDefinition var allowedTools map[string]bool @@ -81,15 +81,16 @@ func (l *Loop) buildFilteredTools(req *RunRequest, hadBootstrap bool, iteration, toolDefs = filtered } - // Final iteration: strip all tools to force a text-only response. - // Without this the model may keep requesting tools and exit with "...". + // Final iteration: do NOT nil toolDefs — Anthropic rejects requests containing + // tool_use/tool_result history without a top-level "tools" parameter (HTTP 400). + // Use tool_choice="none" to enforce text-only response at the API level instead. if iteration == maxIter { - toolDefs = nil messages = append(messages, providers.Message{ Role: "user", Content: "[System] Final iteration reached. Summarize all findings and respond to the user now. No more tool calls allowed.", }) + return toolDefs, allowedTools, messages, "none" } - return toolDefs, allowedTools, messages + return toolDefs, allowedTools, messages, "" } diff --git a/internal/providers/anthropic_request.go b/internal/providers/anthropic_request.go index 14cb0d554..2016fe472 100644 --- a/internal/providers/anthropic_request.go +++ b/internal/providers/anthropic_request.go @@ -184,6 +184,14 @@ func (p *AnthropicProvider) buildRequestBody(model string, req ChatRequest, stre tools[len(tools)-1]["cache_control"] = map[string]any{"type": "ephemeral"} } body["tools"] = tools + switch req.ToolChoice { + case "none": + body["tool_choice"] = map[string]any{"type": "none"} + case "required": + body["tool_choice"] = map[string]any{"type": "any"} + default: + body["tool_choice"] = map[string]any{"type": "auto"} + } } // Merge options diff --git a/internal/providers/openai.go b/internal/providers/openai.go index ed29e6663..c5423c139 100644 --- a/internal/providers/openai.go +++ b/internal/providers/openai.go @@ -383,7 +383,12 @@ func (p *OpenAIProvider) buildRequestBody(model string, req ChatRequest, stream if len(req.Tools) > 0 { body["tools"] = CleanToolSchemas(p.name, req.Tools) - body["tool_choice"] = "auto" + switch req.ToolChoice { + case "none", "required": + body["tool_choice"] = req.ToolChoice + default: + body["tool_choice"] = "auto" + } } if stream { @@ -412,9 +417,13 @@ func (p *OpenAIProvider) buildRequestBody(model string, req ChatRequest, stream } } - // Inject reasoning_effort for o-series models (ignored by models that don't support it) + // Inject reasoning_effort for o-series models (ignored by models that don't support it). + // When thinking/reasoning is enabled, remove temperature — Anthropic requires + // temperature=1 (default) when extended thinking is active, and proxies like + // LiteLLM forward it verbatim causing 400 errors. if level, ok := req.Options[OptThinkingLevel].(string); ok && level != "" && level != "off" { body[OptReasoningEffort] = level + delete(body, "temperature") } // DashScope-specific passthrough keys diff --git a/internal/providers/types.go b/internal/providers/types.go index 598b15ebc..c85c0af6f 100644 --- a/internal/providers/types.go +++ b/internal/providers/types.go @@ -63,10 +63,11 @@ type ThinkingCapable interface { // ChatRequest contains the input for a Chat/ChatStream call. type ChatRequest struct { - Messages []Message `json:"messages"` - Tools []ToolDefinition `json:"tools,omitempty"` - Model string `json:"model,omitempty"` - Options map[string]any `json:"options,omitempty"` + Messages []Message `json:"messages"` + Tools []ToolDefinition `json:"tools,omitempty"` + ToolChoice string `json:"tool_choice,omitempty"` // "auto" (default), "none", or "required" + Model string `json:"model,omitempty"` + Options map[string]any `json:"options,omitempty"` } // ChatResponse is the result from an LLM call. From 6e433e76e7cf5bc2f124971ec090785a95d76d69 Mon Sep 17 00:00:00 2001 From: kokorolx Date: Mon, 30 Mar 2026 04:16:17 +0000 Subject: [PATCH 2/8] =?UTF-8?q?feat(team):=20direct=20delivery=20=E2=80=94?= =?UTF-8?q?=20team=20members=20post=20results=20to=20channel=20directly?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a team member (e.g. CTO) completes a task, their response is now posted directly to the origin Discord channel via their own bot identity, instead of being relayed through the lead agent (PM). Changes: - message tool: relax self-send guard for team task runs so members can use message() to post updates during execution - consumer: post member completion result directly via member agent channel (resolved via ChannelForAgent), falling back to origin channel - announce queue: when all entries are direct-delivered, instruct leader to respond NO_REPLY unless it has new information to add - manager: add ChannelForAgent() to resolve agent key → channel name This creates a natural conversation flow where each agent speaks for themselves, like a normal team chat. --- cmd/gateway_announce_queue.go | 20 ++++++++++++++++---- cmd/gateway_consumer_handlers.go | 23 ++++++++++++++++++++++- internal/channels/manager.go | 30 ++++++++++++++++++++++++++++++ internal/tools/message.go | 16 ++++++++-------- 4 files changed, 76 insertions(+), 13 deletions(-) diff --git a/cmd/gateway_announce_queue.go b/cmd/gateway_announce_queue.go index 5adc5caf5..fb96e1191 100644 --- a/cmd/gateway_announce_queue.go +++ b/cmd/gateway_announce_queue.go @@ -19,10 +19,11 @@ import ( // announceEntry holds one teammate completion result waiting to be announced. type announceEntry struct { - MemberAgent string // agent key (e.g. "researcher") - MemberDisplayName string // display name (e.g. "Nhà Nghiên Cứu"), empty if not set - Content string - Media []agent.MediaResult + MemberAgent string // agent key (e.g. "researcher") + MemberDisplayName string // display name (e.g. "Nhà Nghiên Cứu"), empty if not set + Content string + Media []agent.MediaResult + DirectDelivered bool // true when member already posted result to channel directly } // announceQueueState tracks the per-session announce queue. @@ -123,7 +124,18 @@ func processAnnounceLoop( } } + allDirect := true + for _, e := range entries { + if !e.DirectDelivered { + allDirect = false + break + } + } + content := buildMergedAnnounceContent(entries, snapshot, r.TeamWorkspace) + if allDirect { + content += "\n\n[The member already posted the result directly to the chat. Do NOT repeat or rephrase it. Respond with NO_REPLY unless you have NEW information to add (e.g. next steps, follow-up tasks).]" + } req := agent.RunRequest{ SessionKey: r.LeadSessionKey, diff --git a/cmd/gateway_consumer_handlers.go b/cmd/gateway_consumer_handlers.go index a0124b94f..017eb1916 100644 --- a/cmd/gateway_consumer_handlers.go +++ b/cmd/gateway_consumer_handlers.go @@ -368,6 +368,26 @@ func handleTeammateMessage( } } + // Direct delivery: post the member's result to the origin chat via the + // member agent's own channel (so it appears under the correct bot identity). + // Falls back to origin channel if the member has no dedicated channel. + if origChatID != "" && outcome.Err == nil && !agent.IsSilentReply(announceContent) { + memberAgent := inMeta[tools.MetaToAgent] + memberChannel := "" + if deps.ChannelMgr != nil { + memberChannel = deps.ChannelMgr.ChannelForAgent(memberAgent) + } + if memberChannel == "" { + memberChannel = origCh + } + deps.MsgBus.PublishOutbound(bus.OutboundMessage{ + Channel: memberChannel, + ChatID: origChatID, + Content: announceContent, + Metadata: outMeta, + }) + } + // Announce result (or failure) to lead agent via announce queue. // Queue merges concurrent completions into a single batched announce. if origChatID == "" { @@ -424,12 +444,13 @@ func handleTeammateMessage( announceContent = string([]rune(announceContent)[:50_000]) + "\n[truncated]" } - // Enqueue result. If we become the processor, run the announce loop. + directDelivered := origChatID != "" && outcome.Err == nil && !agent.IsSilentReply(announceContent) entry := announceEntry{ MemberAgent: inMeta[tools.MetaToAgent], MemberDisplayName: inMeta[tools.MetaToAgentDisplay], Content: announceContent, Media: announceMedia, + DirectDelivered: directDelivered, } q, isProcessor := enqueueAnnounce(leadSessionKey, entry) if !isProcessor { diff --git a/internal/channels/manager.go b/internal/channels/manager.go index dfc47b918..be84fe254 100644 --- a/internal/channels/manager.go +++ b/internal/channels/manager.go @@ -145,6 +145,23 @@ func (m *Manager) GetStatus() map[string]any { return status } +// ChannelForAgent returns the channel name bound to the given agent key. +// Returns empty string if no channel is configured for the agent. +func (m *Manager) ChannelForAgent(agentKey string) string { + if agentKey == "" { + return "" + } + type agentBound interface{ AgentID() string } + m.mu.RLock() + defer m.mu.RUnlock() + for name, ch := range m.channels { + if ab, ok := ch.(agentBound); ok && ab.AgentID() == agentKey { + return name + } + } + return "" +} + // GetEnabledChannels returns the names of all enabled channels. func (m *Manager) GetEnabledChannels() []string { m.mu.RLock() @@ -224,6 +241,19 @@ func (m *Manager) ListGroupMembers(ctx context.Context, channelName, chatID stri return gmp.ListGroupMembers(ctx, chatID) } +func (m *Manager) ResolveChatTeam(channelName, chatID string) string { + m.mu.RLock() + ch, ok := m.channels[channelName] + m.mu.RUnlock() + if !ok { + return "" + } + if tmc, ok := ch.(TeamMappedChannel); ok { + return tmc.ResolveChatTeam(chatID) + } + return "" +} + // UnregisterChannel removes a channel from the manager. func (m *Manager) UnregisterChannel(name string) { m.mu.Lock() diff --git a/internal/tools/message.go b/internal/tools/message.go index 6241d6105..6393d3b40 100644 --- a/internal/tools/message.go +++ b/internal/tools/message.go @@ -33,8 +33,8 @@ func NewMessageTool(workspace string, restrict bool) *MessageTool { return &MessageTool{workspace: workspace, restrict: restrict} } -func (t *MessageTool) SetChannelSender(s ChannelSender) { t.sender = s } -func (t *MessageTool) SetMessageBus(b *bus.MessageBus) { t.msgBus = b } +func (t *MessageTool) SetChannelSender(s ChannelSender) { t.sender = s } +func (t *MessageTool) SetMessageBus(b *bus.MessageBus) { t.msgBus = b } func (t *MessageTool) SetChannelTenantChecker(c ChannelTenantChecker) { t.tenantChecker = c } func (t *MessageTool) Name() string { return "message" } @@ -98,19 +98,19 @@ func (t *MessageTool) Execute(ctx context.Context, args map[string]any) *Result // Self-send guard: prevent agent from sending to its own chat via message tool. // Text self-sends are always blocked (response goes through normal outbound). // MEDIA self-sends are allowed ONLY when the file was NOT already queued for - // delivery (i.e. write_file was called with deliver=false). This prevents both - // duplicate delivery (deliver=true then message MEDIA:) and runaway retry loops - // (deliver=false then message MEDIA: blocked unconditionally). + // delivery (i.e. write_file was called with deliver=false). + // + // Exception: team task runs (delegation) are allowed to self-send because the + // member agent's response is NOT auto-delivered to the origin channel — it goes + // through the leader announce queue instead. ctxChannel := ToolChannelFromCtx(ctx) ctxChatID := ToolChatIDFromCtx(ctx) isSelfSend := ctxChannel != "" && ctxChatID != "" && channel == ctxChannel && target == ctxChatID - if isSelfSend { + if isSelfSend && TeamTaskIDFromCtx(ctx) == "" { isMediaSend := embeddedMediaPattern.MatchString(message) if !isMediaSend { return ErrorResult("You are already responding to this chat. Your response text will be delivered automatically. Do not use the message tool to send text to your own chat — just include the content in your response text. To deliver files, use write_file with deliver=true instead.") } - // MEDIA self-send: block if ALL referenced files are already queued for delivery. - // Extracts paths from both standalone "MEDIA:path" and embedded multi-line messages. if dm := DeliveredMediaFromCtx(ctx); dm != nil { mediaRefs := embeddedMediaPattern.FindAllString(message, -1) allDelivered := len(mediaRefs) > 0 From 99ee5088cd65b1152331d2409d7db03e8b9e2d56 Mon Sep 17 00:00:00 2001 From: kokorolx Date: Mon, 30 Mar 2026 08:18:26 +0000 Subject: [PATCH 3/8] feat(team): chat_team_map routing, memory team scoping, direct delivery logging, agent workspace preservation - Add chat_team_map support for Discord channels to route inbound messages to correct team based on channel snowflake ID - Scope memory search (FTS + vector) by team_id to prevent cross-project contamination when same PM agent serves multiple teams - Populate team_id on memory_documents and memory_chunks during write - Relax NO_REPLY instruction to allow PM to create follow-up tasks (e.g. CTO review gate) after direct delivery of member results - Add direct delivery logging with channel, chat_id, fallback info - Fix Discord connect race: register message handler after bot identity fetch - Preserve agent base workspace in team context for file tool access - Add TryMarkListed atomic helper for team task state transitions --- cmd/gateway_announce_queue.go | 10 ++- cmd/gateway_consumer_handlers.go | 9 +++ cmd/gateway_consumer_normal.go | 6 ++ internal/agent/loop_context.go | 12 ++++ internal/channels/channel.go | 28 ++++++-- internal/channels/discord/discord.go | 8 ++- internal/channels/discord/factory.go | 29 ++++---- internal/config/config_channels.go | 103 ++++++++++++++------------- internal/store/context.go | 16 ++++- internal/store/pg/memory_docs.go | 32 +++++---- internal/store/pg/memory_search.go | 67 ++++++++++------- internal/tools/context_keys.go | 30 ++++++++ internal/tools/filesystem.go | 33 +++++---- internal/tools/team_tasks_read.go | 7 +- internal/tools/team_tool_dispatch.go | 20 +++--- 15 files changed, 268 insertions(+), 142 deletions(-) diff --git a/cmd/gateway_announce_queue.go b/cmd/gateway_announce_queue.go index fb96e1191..c3e45c73d 100644 --- a/cmd/gateway_announce_queue.go +++ b/cmd/gateway_announce_queue.go @@ -134,7 +134,9 @@ func processAnnounceLoop( content := buildMergedAnnounceContent(entries, snapshot, r.TeamWorkspace) if allDirect { - content += "\n\n[The member already posted the result directly to the chat. Do NOT repeat or rephrase it. Respond with NO_REPLY unless you have NEW information to add (e.g. next steps, follow-up tasks).]" + content += "\n\n[The member already posted the result directly to the chat. Do NOT repeat or rephrase their work. " + + "However, you SHOULD still create follow-up tasks if your workflow requires it (e.g. code review gate, deployment gate, QA). " + + "If you have no follow-up tasks and no new information, respond with NO_REPLY.]" } req := agent.RunRequest{ @@ -205,7 +207,11 @@ func processAnnounceLoop( } slog.Info("teammate announce: batch processed", - "batch_size", len(entries), "session", r.LeadSessionKey) + "batch_size", len(entries), + "all_direct", allDirect, + "channel", r.OrigChannel, + "session", r.LeadSessionKey, + ) // Loop back — tryFinish at top will exit when queue is truly empty. } diff --git a/cmd/gateway_consumer_handlers.go b/cmd/gateway_consumer_handlers.go index 017eb1916..a3aa5773b 100644 --- a/cmd/gateway_consumer_handlers.go +++ b/cmd/gateway_consumer_handlers.go @@ -374,12 +374,21 @@ func handleTeammateMessage( if origChatID != "" && outcome.Err == nil && !agent.IsSilentReply(announceContent) { memberAgent := inMeta[tools.MetaToAgent] memberChannel := "" + usingFallback := false if deps.ChannelMgr != nil { memberChannel = deps.ChannelMgr.ChannelForAgent(memberAgent) } if memberChannel == "" { memberChannel = origCh + usingFallback = true } + slog.Info("teammate direct delivery", + "member_agent", memberAgent, + "channel", memberChannel, + "chat_id", origChatID, + "fallback", usingFallback, + "content_len", len(announceContent), + ) deps.MsgBus.PublishOutbound(bus.OutboundMessage{ Channel: memberChannel, ChatID: origChatID, diff --git a/cmd/gateway_consumer_normal.go b/cmd/gateway_consumer_normal.go index 6215b3024..b32a4b86c 100644 --- a/cmd/gateway_consumer_normal.go +++ b/cmd/gateway_consumer_normal.go @@ -344,6 +344,11 @@ func processNormalMessage( schedCtx = tools.WithRunKind(schedCtx, rk) } + chatTeamID := "" + if deps.ChannelMgr != nil { + chatTeamID = deps.ChannelMgr.ResolveChatTeam(msg.Channel, msg.ChatID) + } + // Schedule through main lane (per-session concurrency controlled by maxConcurrent) outCh := deps.Sched.ScheduleWithOpts(schedCtx, "main", agent.RunRequest{ SessionKey: sessionKey, @@ -364,6 +369,7 @@ func processNormalMessage( ToolAllow: msg.ToolAllow, ExtraSystemPrompt: extraPrompt, SkillFilter: skillFilter, + TeamID: chatTeamID, }, scheduler.ScheduleOpts{ MaxConcurrent: maxConcurrent, }) diff --git a/internal/agent/loop_context.go b/internal/agent/loop_context.go index 3ddcce6bd..398f87dcb 100644 --- a/internal/agent/loop_context.go +++ b/internal/agent/loop_context.go @@ -132,15 +132,26 @@ func (l *Loop) injectContext(ctx context.Context, req *RunRequest) (contextSetup } // Team workspace: dispatched task overrides default workspace. + // Preserve the agent's own workspace so file tools can still allow access + // to project files (e.g. the code repo the agent normally works in). if req.TeamWorkspace != "" { if err := os.MkdirAll(req.TeamWorkspace, 0755); err != nil { slog.Warn("failed to create team workspace directory", "workspace", req.TeamWorkspace, "error", err) } + // Save the agent's BASE workspace (l.workspace), not the per-user + // derived workspace from context which may already have user-isolation + // layers applied (e.g. /workspaces/group_discord-pm_123...). + if l.workspace != "" { + ctx = tools.WithToolAgentWorkspace(ctx, l.workspace) + } ctx = tools.WithToolTeamWorkspace(ctx, req.TeamWorkspace) ctx = tools.WithToolWorkspace(ctx, req.TeamWorkspace) } if req.TeamID != "" { ctx = tools.WithToolTeamID(ctx, req.TeamID) + if tid, err := uuid.Parse(req.TeamID); err == nil && tid != uuid.Nil { + ctx = store.WithMemoryTeamID(ctx, tid) + } } if req.LeaderAgentID != "" { ctx = tools.WithLeaderAgentID(ctx, req.LeaderAgentID) @@ -172,6 +183,7 @@ func (l *Loop) injectContext(ctx context.Context, req *RunRequest) (contextSetup } if req.TeamID == "" { ctx = tools.WithToolTeamID(ctx, team.ID.String()) + ctx = store.WithMemoryTeamID(ctx, team.ID) } } } diff --git a/internal/channels/channel.go b/internal/channels/channel.go index c4bf45703..3a19baef3 100644 --- a/internal/channels/channel.go +++ b/internal/channels/channel.go @@ -40,9 +40,9 @@ type DMPolicy string const ( DMPolicyPairing DMPolicy = "pairing" // Require pairing code - DMPolicyAllowlist DMPolicy = "allowlist" // Only whitelisted senders - DMPolicyOpen DMPolicy = "open" // Accept all - DMPolicyDisabled DMPolicy = "disabled" // Reject all DMs + DMPolicyAllowlist DMPolicy = "allowlist" // Only whitelisted senders + DMPolicyOpen DMPolicy = "open" // Accept all + DMPolicyDisabled DMPolicy = "disabled" // Reject all DMs ) // GroupPolicy controls how group messages are handled. @@ -50,8 +50,8 @@ type GroupPolicy string const ( GroupPolicyOpen GroupPolicy = "open" // Accept all groups - GroupPolicyAllowlist GroupPolicy = "allowlist" // Only whitelisted groups - GroupPolicyDisabled GroupPolicy = "disabled" // No group messages + GroupPolicyAllowlist GroupPolicy = "allowlist" // Only whitelisted groups + GroupPolicyDisabled GroupPolicy = "disabled" // No group messages ) // Channel type constants used across channel packages and gateway wiring. @@ -149,6 +149,15 @@ type ReactionChannel interface { ClearReaction(ctx context.Context, chatID string, messageID string) error } +// TeamMappedChannel is optionally implemented by channels that map specific +// chat IDs to Goclaw team IDs. Used to route inbound messages to the correct +// team when an agent leads multiple teams. +type TeamMappedChannel interface { + Channel + // ResolveChatTeam returns the team ID for the given chat ID, or "" if not mapped. + ResolveChatTeam(chatID string) string +} + // BaseChannel provides shared functionality for all channel implementations. // Channel implementations should embed this struct. type BaseChannel struct { @@ -157,9 +166,10 @@ type BaseChannel struct { bus *bus.MessageBus running bool allowList []string - agentID string // for DB instances: routes to specific agent (empty = use resolveAgentRoute) - tenantID uuid.UUID // for DB instances: tenant scope (zero = master tenant fallback) + agentID string // for DB instances: routes to specific agent (empty = use resolveAgentRoute) + tenantID uuid.UUID // for DB instances: tenant scope (zero = master tenant fallback) contactCollector *store.ContactCollector // optional: auto-collect contacts from channel messages + chatTeamMap map[string]string } // NewBaseChannel creates a new BaseChannel with the given parameters. @@ -203,6 +213,10 @@ func (c *BaseChannel) SetTenantID(id uuid.UUID) { c.tenantID = id } // SetContactCollector sets the contact collector for auto-collecting contacts from messages. func (c *BaseChannel) SetContactCollector(cc *store.ContactCollector) { c.contactCollector = cc } +func (c *BaseChannel) SetChatTeamMap(m map[string]string) { c.chatTeamMap = m } + +func (c *BaseChannel) ResolveChatTeam(chatID string) string { return c.chatTeamMap[chatID] } + // ContactCollector returns the contact collector (may be nil). func (c *BaseChannel) ContactCollector() *store.ContactCollector { return c.contactCollector } diff --git a/internal/channels/discord/discord.go b/internal/channels/discord/discord.go index b70989560..0ba818f16 100644 --- a/internal/channels/discord/discord.go +++ b/internal/channels/discord/discord.go @@ -83,13 +83,13 @@ func (c *Channel) Start(_ context.Context) error { c.groupHistory.StartFlusher() slog.Info("starting discord bot") - c.session.AddHandler(c.handleMessage) - if err := c.session.Open(); err != nil { return fmt.Errorf("open discord session: %w", err) } - // Fetch bot identity + // Fetch bot identity before registering message handler so that + // botUserID is always set when handleMessage fires (avoids mention + // check failing on events replayed immediately after connect). user, err := c.session.User("@me") if err != nil { c.session.Close() @@ -97,6 +97,8 @@ func (c *Channel) Start(_ context.Context) error { } c.botUserID = user.ID + c.session.AddHandler(c.handleMessage) + c.SetRunning(true) slog.Info("discord bot connected", "username", user.Username, "id", user.ID) diff --git a/internal/channels/discord/factory.go b/internal/channels/discord/factory.go index f2bf2a7db..7a017e8f4 100644 --- a/internal/channels/discord/factory.go +++ b/internal/channels/discord/factory.go @@ -17,18 +17,19 @@ type discordCreds struct { // discordInstanceConfig maps the non-secret config JSONB from the channel_instances table. type discordInstanceConfig struct { - DMPolicy string `json:"dm_policy,omitempty"` - GroupPolicy string `json:"group_policy,omitempty"` - AllowFrom []string `json:"allow_from,omitempty"` - RequireMention *bool `json:"require_mention,omitempty"` - HistoryLimit int `json:"history_limit,omitempty"` - BlockReply *bool `json:"block_reply,omitempty"` - MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` - STTProxyURL string `json:"stt_proxy_url,omitempty"` - STTAPIKey string `json:"stt_api_key,omitempty"` - STTTenantID string `json:"stt_tenant_id,omitempty"` - STTTimeoutSeconds int `json:"stt_timeout_seconds,omitempty"` - VoiceAgentID string `json:"voice_agent_id,omitempty"` + DMPolicy string `json:"dm_policy,omitempty"` + GroupPolicy string `json:"group_policy,omitempty"` + AllowFrom []string `json:"allow_from,omitempty"` + RequireMention *bool `json:"require_mention,omitempty"` + HistoryLimit int `json:"history_limit,omitempty"` + BlockReply *bool `json:"block_reply,omitempty"` + MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` + STTProxyURL string `json:"stt_proxy_url,omitempty"` + STTAPIKey string `json:"stt_api_key,omitempty"` + STTTenantID string `json:"stt_tenant_id,omitempty"` + STTTimeoutSeconds int `json:"stt_timeout_seconds,omitempty"` + VoiceAgentID string `json:"voice_agent_id,omitempty"` + ChatTeamMap map[string]string `json:"chat_team_map,omitempty"` } // Factory creates a Discord channel from DB instance data (no extra stores). @@ -82,6 +83,7 @@ func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage, STTTenantID: ic.STTTenantID, STTTimeoutSeconds: ic.STTTimeoutSeconds, VoiceAgentID: ic.VoiceAgentID, + ChatTeamMap: ic.ChatTeamMap, } // DB instances default to "pairing" for groups (secure by default). @@ -95,5 +97,8 @@ func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage, } ch.SetName(name) + if len(dcCfg.ChatTeamMap) > 0 { + ch.SetChatTeamMap(dcCfg.ChatTeamMap) + } return ch, nil } diff --git a/internal/config/config_channels.go b/internal/config/config_channels.go index db310a851..3f8895fd4 100644 --- a/internal/config/config_channels.go +++ b/internal/config/config_channels.go @@ -24,25 +24,25 @@ type ChannelsConfig struct { } type TelegramConfig struct { - Enabled bool `json:"enabled"` - Token string `json:"token"` - Proxy string `json:"proxy,omitempty"` - APIServer string `json:"api_server,omitempty"` // custom Telegram Bot API server URL (e.g. "http://localhost:8081") - AllowFrom FlexibleStringSlice `json:"allow_from"` - DMPolicy string `json:"dm_policy,omitempty"` // "pairing" (default), "allowlist", "open", "disabled" - GroupPolicy string `json:"group_policy,omitempty"` // "open" (default), "allowlist", "disabled" - RequireMention *bool `json:"require_mention,omitempty"` // require @bot mention in groups (default true) - MentionMode string `json:"mention_mode,omitempty"` // "strict" (default) = only respond when mentioned; "yield" = respond unless another bot is mentioned - HistoryLimit int `json:"history_limit,omitempty"` // max pending group messages for context (default 50, 0=disabled) - DMStream *bool `json:"dm_stream,omitempty"` // enable streaming for DMs (default false) — edits placeholder progressively - GroupStream *bool `json:"group_stream,omitempty"` // enable streaming for groups (default false) — sends new message, edits progressively - DraftTransport *bool `json:"draft_transport,omitempty"` // use sendMessageDraft for DM streaming (default true) — stealth preview, no notifications per edit - ReasoningStream *bool `json:"reasoning_stream,omitempty"` // show reasoning as separate message when provider emits thinking events (default true) - ReactionLevel string `json:"reaction_level,omitempty"` // "off" (default), "minimal", "full" — status emoji reactions - MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // max media download size in bytes (default 20MB) - LinkPreview *bool `json:"link_preview,omitempty"` // enable URL previews in messages (default true) - BlockReply *bool `json:"block_reply,omitempty"` // override gateway block_reply (nil = inherit) - ForceIPv4 bool `json:"force_ipv4,omitempty"` // force IPv4 for all Telegram API requests (use when IPv6 routing is broken) + Enabled bool `json:"enabled"` + Token string `json:"token"` + Proxy string `json:"proxy,omitempty"` + APIServer string `json:"api_server,omitempty"` // custom Telegram Bot API server URL (e.g. "http://localhost:8081") + AllowFrom FlexibleStringSlice `json:"allow_from"` + DMPolicy string `json:"dm_policy,omitempty"` // "pairing" (default), "allowlist", "open", "disabled" + GroupPolicy string `json:"group_policy,omitempty"` // "open" (default), "allowlist", "disabled" + RequireMention *bool `json:"require_mention,omitempty"` // require @bot mention in groups (default true) + MentionMode string `json:"mention_mode,omitempty"` // "strict" (default) = only respond when mentioned; "yield" = respond unless another bot is mentioned + HistoryLimit int `json:"history_limit,omitempty"` // max pending group messages for context (default 50, 0=disabled) + DMStream *bool `json:"dm_stream,omitempty"` // enable streaming for DMs (default false) — edits placeholder progressively + GroupStream *bool `json:"group_stream,omitempty"` // enable streaming for groups (default false) — sends new message, edits progressively + DraftTransport *bool `json:"draft_transport,omitempty"` // use sendMessageDraft for DM streaming (default true) — stealth preview, no notifications per edit + ReasoningStream *bool `json:"reasoning_stream,omitempty"` // show reasoning as separate message when provider emits thinking events (default true) + ReactionLevel string `json:"reaction_level,omitempty"` // "off" (default), "minimal", "full" — status emoji reactions + MediaMaxBytes int64 `json:"media_max_bytes,omitempty"` // max media download size in bytes (default 20MB) + LinkPreview *bool `json:"link_preview,omitempty"` // enable URL previews in messages (default true) + BlockReply *bool `json:"block_reply,omitempty"` // override gateway block_reply (nil = inherit) + ForceIPv4 bool `json:"force_ipv4,omitempty"` // force IPv4 for all Telegram API requests (use when IPv6 routing is broken) // Optional STT (Speech-to-Text) pipeline for voice/audio inbound messages. // When stt_proxy_url is set, audio/voice messages are transcribed before being forwarded to the agent. @@ -109,6 +109,7 @@ type DiscordConfig struct { STTTenantID string `json:"stt_tenant_id,omitempty"` STTTimeoutSeconds int `json:"stt_timeout_seconds,omitempty"` VoiceAgentID string `json:"voice_agent_id,omitempty"` + ChatTeamMap map[string]string `json:"chat_team_map,omitempty"` } type SlackConfig struct { @@ -194,19 +195,19 @@ type FeishuConfig struct { // ProvidersConfig maps provider name to its config. type ProvidersConfig struct { - Anthropic ProviderConfig `json:"anthropic"` - OpenAI ProviderConfig `json:"openai"` - OpenRouter ProviderConfig `json:"openrouter"` - Groq ProviderConfig `json:"groq"` - Gemini ProviderConfig `json:"gemini"` - DeepSeek ProviderConfig `json:"deepseek"` - Mistral ProviderConfig `json:"mistral"` - XAI ProviderConfig `json:"xai"` - MiniMax ProviderConfig `json:"minimax"` - Cohere ProviderConfig `json:"cohere"` - Perplexity ProviderConfig `json:"perplexity"` - DashScope ProviderConfig `json:"dashscope"` - Bailian ProviderConfig `json:"bailian"` + Anthropic ProviderConfig `json:"anthropic"` + OpenAI ProviderConfig `json:"openai"` + OpenRouter ProviderConfig `json:"openrouter"` + Groq ProviderConfig `json:"groq"` + Gemini ProviderConfig `json:"gemini"` + DeepSeek ProviderConfig `json:"deepseek"` + Mistral ProviderConfig `json:"mistral"` + XAI ProviderConfig `json:"xai"` + MiniMax ProviderConfig `json:"minimax"` + Cohere ProviderConfig `json:"cohere"` + Perplexity ProviderConfig `json:"perplexity"` + DashScope ProviderConfig `json:"dashscope"` + Bailian ProviderConfig `json:"bailian"` Zai ProviderConfig `json:"zai"` ZaiCoding ProviderConfig `json:"zai_coding"` Ollama OllamaConfig `json:"ollama"` // local Ollama instance (no API key needed) @@ -336,16 +337,16 @@ type QuotaConfig struct { // GatewayConfig controls the gateway server. type GatewayConfig struct { - Host string `json:"host"` - Port int `json:"port"` - Token string `json:"token,omitempty"` // bearer token for WS/HTTP auth - OwnerIDs []string `json:"owner_ids,omitempty"` // sender IDs considered "owner" - AllowedOrigins []string `json:"allowed_origins,omitempty"` // WebSocket CORS whitelist (empty = allow all) - MaxMessageChars int `json:"max_message_chars,omitempty"` // max user message characters (default 32000) - RateLimitRPM int `json:"rate_limit_rpm,omitempty"` // rate limit: requests per minute per user (default 20, 0 = disabled) - InjectionAction string `json:"injection_action,omitempty"` // prompt injection action: "log", "warn" (default), "block", "off" - InboundDebounceMs int `json:"inbound_debounce_ms,omitempty"` // merge rapid messages from same sender (default 1000ms, -1 = disabled) - Quota *QuotaConfig `json:"quota,omitempty"` // per-user/group request quotas + Host string `json:"host"` + Port int `json:"port"` + Token string `json:"token,omitempty"` // bearer token for WS/HTTP auth + OwnerIDs []string `json:"owner_ids,omitempty"` // sender IDs considered "owner" + AllowedOrigins []string `json:"allowed_origins,omitempty"` // WebSocket CORS whitelist (empty = allow all) + MaxMessageChars int `json:"max_message_chars,omitempty"` // max user message characters (default 32000) + RateLimitRPM int `json:"rate_limit_rpm,omitempty"` // rate limit: requests per minute per user (default 20, 0 = disabled) + InjectionAction string `json:"injection_action,omitempty"` // prompt injection action: "log", "warn" (default), "block", "off" + InboundDebounceMs int `json:"inbound_debounce_ms,omitempty"` // merge rapid messages from same sender (default 1000ms, -1 = disabled) + Quota *QuotaConfig `json:"quota,omitempty"` // per-user/group request quotas BlockReply *bool `json:"block_reply,omitempty"` // deliver intermediate text during tool iterations (default false) ToolStatus *bool `json:"tool_status,omitempty"` // show tool name in streaming preview during tool execution (default true) TaskRecoveryIntervalSec int `json:"task_recovery_interval_sec,omitempty"` // team task recovery ticker interval in seconds (default 300 = 5min) @@ -401,9 +402,9 @@ type WebFetchPolicyConfig struct { // BrowserToolConfig controls the browser automation tool. type BrowserToolConfig struct { - Enabled bool `json:"enabled"` // enable the browser tool (default false) - Headless bool `json:"headless,omitempty"` // run Chrome in headless mode (ignored when RemoteURL is set) - RemoteURL string `json:"remote_url,omitempty"` // CDP endpoint for remote Chrome sidecar, e.g. "ws://chrome:9222" + Enabled bool `json:"enabled"` // enable the browser tool (default false) + Headless bool `json:"headless,omitempty"` // run Chrome in headless mode (ignored when RemoteURL is set) + RemoteURL string `json:"remote_url,omitempty"` // CDP endpoint for remote Chrome sidecar, e.g. "ws://chrome:9222" ActionTimeoutMs int `json:"action_timeout_ms,omitempty"` // per-action timeout in ms (default 30000) IdleTimeoutMs int `json:"idle_timeout_ms,omitempty"` // idle page auto-close in ms (default 600000, 0=disabled) MaxPages int `json:"max_pages,omitempty"` // max open pages per tenant (default 5) @@ -411,12 +412,12 @@ type BrowserToolConfig struct { // ToolPolicySpec defines a tool policy at any level (global, per-agent, per-provider). type ToolPolicySpec struct { - Profile string `json:"profile,omitempty"` - Allow []string `json:"allow,omitempty"` - Deny []string `json:"deny,omitempty"` - AlsoAllow []string `json:"alsoAllow,omitempty"` - ByProvider map[string]*ToolPolicySpec `json:"byProvider,omitempty"` - ToolCallPrefix string `json:"toolCallPrefix,omitempty"` // prefix to strip from model's tool call names before registry lookup + Profile string `json:"profile,omitempty"` + Allow []string `json:"allow,omitempty"` + Deny []string `json:"deny,omitempty"` + AlsoAllow []string `json:"alsoAllow,omitempty"` + ByProvider map[string]*ToolPolicySpec `json:"byProvider,omitempty"` + ToolCallPrefix string `json:"toolCallPrefix,omitempty"` // prefix to strip from model's tool call names before registry lookup } type WebToolsConfig struct { diff --git a/internal/store/context.go b/internal/store/context.go index afaa9fa18..76ed59fca 100644 --- a/internal/store/context.go +++ b/internal/store/context.go @@ -39,6 +39,9 @@ const ( TenantSlugKey contextKey = "goclaw_tenant_slug" // RoleKey is the context key for the caller's permission role (e.g. "admin", "operator", "viewer"). RoleKey contextKey = "goclaw_role" + // MemoryTeamIDKey scopes memory reads/writes to a specific team. + // When set, memory operations filter by team_id to prevent cross-project contamination. + MemoryTeamIDKey contextKey = "goclaw_memory_team_id" ) // WithShellDenyGroups returns a new context with shell deny group overrides. @@ -169,8 +172,17 @@ func IsSharedMemory(ctx context.Context) bool { return false } -// MemoryUserID returns the userID to use for memory operations. -// Returns "" (shared/global) when shared memory is active, otherwise the per-user ID. +func WithMemoryTeamID(ctx context.Context, teamID uuid.UUID) context.Context { + return context.WithValue(ctx, MemoryTeamIDKey, teamID) +} + +func MemoryTeamID(ctx context.Context) uuid.UUID { + if v, ok := ctx.Value(MemoryTeamIDKey).(uuid.UUID); ok { + return v + } + return uuid.Nil +} + func MemoryUserID(ctx context.Context) string { if IsSharedMemory(ctx) { return "" diff --git a/internal/store/pg/memory_docs.go b/internal/store/pg/memory_docs.go index 7394376b2..37c1f0d36 100644 --- a/internal/store/pg/memory_docs.go +++ b/internal/store/pg/memory_docs.go @@ -18,7 +18,7 @@ import ( type PGMemoryStore struct { db *sql.DB provider store.EmbeddingProvider - mu sync.RWMutex // protects cfg from concurrent read/write + mu sync.RWMutex // protects cfg from concurrent read/write cfg PGMemoryConfig } @@ -86,12 +86,17 @@ func (s *PGMemoryStore) PutDocument(ctx context.Context, agentID, userID, path, uid = &userID } + var teamID *uuid.UUID + if mtid := store.MemoryTeamID(ctx); mtid != uuid.Nil { + teamID = &mtid + } + _, err := s.db.ExecContext(ctx, - `INSERT INTO memory_documents (id, agent_id, user_id, path, content, hash, tenant_id, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `INSERT INTO memory_documents (id, agent_id, user_id, path, content, hash, tenant_id, team_id, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (agent_id, COALESCE(user_id, ''), path) - DO UPDATE SET content = EXCLUDED.content, hash = EXCLUDED.hash, tenant_id = EXCLUDED.tenant_id, updated_at = EXCLUDED.updated_at`, - id, aid, uid, path, content, hash, tid, now, + DO UPDATE SET content = EXCLUDED.content, hash = EXCLUDED.hash, tenant_id = EXCLUDED.tenant_id, team_id = COALESCE(EXCLUDED.team_id, memory_documents.team_id), updated_at = EXCLUDED.updated_at`, + id, aid, uid, path, content, hash, tid, teamID, now, ) return err } @@ -319,6 +324,10 @@ func (s *PGMemoryStore) IndexDocument(ctx context.Context, agentID, userID, path // Insert chunks tid := tenantIDForInsert(ctx) + var chunkTeamID *uuid.UUID + if mtid := store.MemoryTeamID(ctx); mtid != uuid.Nil { + chunkTeamID = &mtid + } for i, tc := range chunks { hash := memory.ContentHash(tc.Text) chunkID := uuid.Must(uuid.NewV7()) @@ -330,19 +339,18 @@ func (s *PGMemoryStore) IndexDocument(ctx context.Context, agentID, userID, path } if embeddings != nil && i < len(embeddings) && embeddings[i] != nil { - // Insert with embedding via raw SQL (pgvector) s.db.ExecContext(ctx, - `INSERT INTO memory_chunks (id, agent_id, document_id, user_id, path, start_line, end_line, hash, text, embedding, tenant_id, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::vector, $11, $12)`, + `INSERT INTO memory_chunks (id, agent_id, document_id, user_id, path, start_line, end_line, hash, text, embedding, tenant_id, team_id, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::vector, $11, $12, $13)`, chunkID, aid, docID, uid, path, tc.StartLine, tc.EndLine, hash, tc.Text, - vectorToString(embeddings[i]), tid, now, + vectorToString(embeddings[i]), tid, chunkTeamID, now, ) } else { s.db.ExecContext(ctx, - `INSERT INTO memory_chunks (id, agent_id, document_id, user_id, path, start_line, end_line, hash, text, tenant_id, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + `INSERT INTO memory_chunks (id, agent_id, document_id, user_id, path, start_line, end_line, hash, text, tenant_id, team_id, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING`, - chunkID, aid, docID, uid, path, tc.StartLine, tc.EndLine, hash, tc.Text, tid, now, + chunkID, aid, docID, uid, path, tc.StartLine, tc.EndLine, hash, tc.Text, tid, chunkTeamID, now, ) } } diff --git a/internal/store/pg/memory_search.go b/internal/store/pg/memory_search.go index 80abfc897..e4d611369 100644 --- a/internal/store/pg/memory_search.go +++ b/internal/store/pg/memory_search.go @@ -4,9 +4,19 @@ import ( "context" "fmt" + "github.com/google/uuid" + "github.com/nextlevelbuilder/goclaw/internal/store" ) +func memoryTeamFilter(ctx context.Context, nextParam int) (string, []any) { + tid := store.MemoryTeamID(ctx) + if tid == uuid.Nil { + return "", nil + } + return fmt.Sprintf(" AND (team_id = $%d OR team_id IS NULL)", nextParam), []any{tid} +} + // Search performs hybrid search (FTS + vector) over memory_chunks. // Merges global (user_id IS NULL) + per-user chunks, with user boost. func (s *PGMemoryStore) Search(ctx context.Context, query string, agentID, userID string, opts store.MemorySearchOptions) ([]store.MemorySearchResult, error) { @@ -83,35 +93,39 @@ func (s *PGMemoryStore) ftsSearch(ctx context.Context, query string, agentID any if userID != "" { // fixed params: $1=query, $2=agentID, $3=query, $4=userID - // tenant clause appended at $5 (if filtered), then LIMIT at $5 or $6 - tc, tcArgs, _, err := scopeClause(ctx, 5) + teamFilter, teamArgs := memoryTeamFilter(ctx, 5) + nextParam := 5 + len(teamArgs) + tc, tcArgs, _, err := scopeClause(ctx, nextParam) if err != nil { return nil, err } - limitN := 5 + len(tcArgs) + limitN := nextParam + len(tcArgs) q = fmt.Sprintf(`SELECT path, start_line, end_line, text, user_id, ts_rank(tsv, plainto_tsquery('simple', $1)) AS score FROM memory_chunks WHERE agent_id = $2 AND tsv @@ plainto_tsquery('simple', $3) - AND (user_id IS NULL OR user_id = $4)%s - ORDER BY score DESC LIMIT $%d`, tc, limitN) - args = append([]any{query, agentID, query, userID}, tcArgs...) + AND (user_id IS NULL OR user_id = $4)%s%s + ORDER BY score DESC LIMIT $%d`, teamFilter, tc, limitN) + args = append([]any{query, agentID, query, userID}, teamArgs...) + args = append(args, tcArgs...) args = append(args, limit) } else { // fixed params: $1=query, $2=agentID, $3=query - // tenant clause at $4 (if filtered), then LIMIT at $4 or $5 - tc, tcArgs, _, err := scopeClause(ctx, 4) + teamFilter, teamArgs := memoryTeamFilter(ctx, 4) + nextParam := 4 + len(teamArgs) + tc, tcArgs, _, err := scopeClause(ctx, nextParam) if err != nil { return nil, err } - limitN := 4 + len(tcArgs) + limitN := nextParam + len(tcArgs) q = fmt.Sprintf(`SELECT path, start_line, end_line, text, user_id, ts_rank(tsv, plainto_tsquery('simple', $1)) AS score FROM memory_chunks WHERE agent_id = $2 AND tsv @@ plainto_tsquery('simple', $3) - AND user_id IS NULL%s - ORDER BY score DESC LIMIT $%d`, tc, limitN) - args = append([]any{query, agentID, query}, tcArgs...) + AND user_id IS NULL%s%s + ORDER BY score DESC LIMIT $%d`, teamFilter, tc, limitN) + args = append([]any{query, agentID, query}, teamArgs...) + args = append(args, tcArgs...) args = append(args, limit) } @@ -138,37 +152,41 @@ func (s *PGMemoryStore) vectorSearch(ctx context.Context, embedding []float32, a if userID != "" { // fixed params: $1=vec, $2=agentID, $3=userID - // tenant clause at $4, then ORDER vec at $4+len(tcArgs), LIMIT after - tc, tcArgs, _, err := scopeClause(ctx, 4) + teamFilter, teamArgs := memoryTeamFilter(ctx, 4) + nextParam := 4 + len(teamArgs) + tc, tcArgs, _, err := scopeClause(ctx, nextParam) if err != nil { return nil, err } - orderN := 4 + len(tcArgs) + orderN := nextParam + len(tcArgs) limitN := orderN + 1 q = fmt.Sprintf(`SELECT path, start_line, end_line, text, user_id, 1 - (embedding <=> $1::vector) AS score FROM memory_chunks WHERE agent_id = $2 AND embedding IS NOT NULL - AND (user_id IS NULL OR user_id = $3)%s - ORDER BY embedding <=> $%d::vector LIMIT $%d`, tc, orderN, limitN) - args = append([]any{vecStr, agentID, userID}, tcArgs...) + AND (user_id IS NULL OR user_id = $3)%s%s + ORDER BY embedding <=> $%d::vector LIMIT $%d`, teamFilter, tc, orderN, limitN) + args = append([]any{vecStr, agentID, userID}, teamArgs...) + args = append(args, tcArgs...) args = append(args, vecStr, limit) } else { // fixed params: $1=vec, $2=agentID - // tenant clause at $3, then ORDER vec at $3+len(tcArgs), LIMIT after - tc, tcArgs, _, err := scopeClause(ctx, 3) + teamFilter, teamArgs := memoryTeamFilter(ctx, 3) + nextParam := 3 + len(teamArgs) + tc, tcArgs, _, err := scopeClause(ctx, nextParam) if err != nil { return nil, err } - orderN := 3 + len(tcArgs) + orderN := nextParam + len(tcArgs) limitN := orderN + 1 q = fmt.Sprintf(`SELECT path, start_line, end_line, text, user_id, 1 - (embedding <=> $1::vector) AS score FROM memory_chunks WHERE agent_id = $2 AND embedding IS NOT NULL - AND user_id IS NULL%s - ORDER BY embedding <=> $%d::vector LIMIT $%d`, tc, orderN, limitN) - args = append([]any{vecStr, agentID}, tcArgs...) + AND user_id IS NULL%s%s + ORDER BY embedding <=> $%d::vector LIMIT $%d`, teamFilter, tc, orderN, limitN) + args = append([]any{vecStr, agentID}, teamArgs...) + args = append(args, tcArgs...) args = append(args, vecStr, limit) } @@ -250,4 +268,3 @@ func hybridMerge(fts, vec []scoredChunk, textWeight, vectorWeight float64, curre return results } - diff --git a/internal/tools/context_keys.go b/internal/tools/context_keys.go index 5ca900f70..b233453bb 100644 --- a/internal/tools/context_keys.go +++ b/internal/tools/context_keys.go @@ -377,6 +377,23 @@ func LeaderAgentIDFromCtx(ctx context.Context) string { return "" } +// --- Agent's own workspace (preserved when team workspace overrides) --- + +const ctxAgentWorkspace toolContextKey = "tool_agent_workspace" + +// WithToolAgentWorkspace stores the agent's own workspace path before a team +// workspace override replaces it. This lets file tools allow read access to the +// agent's project workspace even when the primary workspace is the team shared dir. +func WithToolAgentWorkspace(ctx context.Context, ws string) context.Context { + return context.WithValue(ctx, ctxAgentWorkspace, ws) +} + +// ToolAgentWorkspaceFromCtx returns the agent's own workspace path, or empty string. +func ToolAgentWorkspaceFromCtx(ctx context.Context) string { + v, _ := ctx.Value(ctxAgentWorkspace).(string) + return v +} + // --- Workspace scope propagation (delegation origin) --- const ( @@ -460,6 +477,19 @@ func (p *PendingTeamDispatch) HasListed() bool { return p.listed } +// TryMarkListed atomically checks and sets the listed flag. +// Returns true if this call was the first to mark it (caller should acquire the lock). +// Returns false if already listed (caller should skip lock acquisition). +func (p *PendingTeamDispatch) TryMarkListed() bool { + p.mu.Lock() + defer p.mu.Unlock() + if p.listed { + return false + } + p.listed = true + return true +} + // SetTeamLock stores the acquired team create lock so it can be released post-turn. func (p *PendingTeamDispatch) SetTeamLock(m *sync.Mutex) { p.mu.Lock() diff --git a/internal/tools/filesystem.go b/internal/tools/filesystem.go index 7505a2a9f..9b9dc4265 100644 --- a/internal/tools/filesystem.go +++ b/internal/tools/filesystem.go @@ -22,14 +22,14 @@ var virtualSystemFiles = map[string]string{ // ReadFileTool reads file contents, optionally through a sandbox container. type ReadFileTool struct { - workspace string - restrict bool - allowedPrefixes []string // extra allowed path prefixes (e.g. skills dirs) - deniedPrefixes []string // path prefixes to deny access to (e.g. .goclaw) - sandboxMgr sandbox.Manager // nil = direct host access - contextFileIntc *ContextFileInterceptor // nil = no virtual FS routing - memIntc *MemoryInterceptor // nil = no memory routing - permStore store.ConfigPermissionStore // nil = no group read restriction + workspace string + restrict bool + allowedPrefixes []string // extra allowed path prefixes (e.g. skills dirs) + deniedPrefixes []string // path prefixes to deny access to (e.g. .goclaw) + sandboxMgr sandbox.Manager // nil = direct host access + contextFileIntc *ContextFileInterceptor // nil = no virtual FS routing + memIntc *MemoryInterceptor // nil = no memory routing + permStore store.ConfigPermissionStore // nil = no group read restriction } // SetContextFileInterceptor enables virtual FS routing for context files. @@ -292,16 +292,22 @@ func (t *ReadFileTool) paginateOutput(content string, args map[string]any) *Resu return SilentResult(output) } -// allowedWithTeamWorkspace returns the allowed prefixes with team workspace appended -// if present in context. Thread-safe: creates a new slice per request. +// allowedWithTeamWorkspace returns the allowed prefixes with team workspace and +// agent's own workspace appended if present in context. func allowedWithTeamWorkspace(ctx context.Context, base []string) []string { teamWs := ToolTeamWorkspaceFromCtx(ctx) - if teamWs == "" { + agentWs := ToolAgentWorkspaceFromCtx(ctx) + if teamWs == "" && agentWs == "" { return base } - out := make([]string, len(base)+1) + out := make([]string, len(base), len(base)+2) copy(out, base) - out[len(base)] = teamWs + if teamWs != "" { + out = append(out, teamWs) + } + if agentWs != "" { + out = append(out, agentWs) + } return out } @@ -517,4 +523,3 @@ func resolveThroughExistingAncestors(target string) (string, error) { } return filepath.Clean(target), nil } - diff --git a/internal/tools/team_tasks_read.go b/internal/tools/team_tasks_read.go index cad3f7741..11665fede 100644 --- a/internal/tools/team_tasks_read.go +++ b/internal/tools/team_tasks_read.go @@ -208,12 +208,10 @@ func (t *TeamTasksTool) executeList(ctx context.Context, args map[string]any) *R listChatID = "" } - // Acquire team create lock to serialize list→create flows across concurrent goroutines. - if ptd := PendingTeamDispatchFromCtx(ctx); ptd != nil && !ptd.HasListed() { + if ptd := PendingTeamDispatchFromCtx(ctx); ptd != nil && ptd.TryMarkListed() { lock := getTeamCreateLock(team.ID.String(), chatID) lock.Lock() ptd.SetTeamLock(lock) - ptd.MarkListed() } tasks, err := t.manager.Store().ListTasks(ctx, team.ID, "priority", statusFilter, filterUserID, "", listChatID, 0, offset) @@ -382,11 +380,10 @@ func (t *TeamTasksTool) executeSearch(ctx context.Context, args map[string]any) // Acquire team create lock so search also satisfies the list-before-create gate. chatID := ToolChatIDFromCtx(ctx) - if ptd := PendingTeamDispatchFromCtx(ctx); ptd != nil && !ptd.HasListed() { + if ptd := PendingTeamDispatchFromCtx(ctx); ptd != nil && ptd.TryMarkListed() { lock := getTeamCreateLock(team.ID.String(), chatID) lock.Lock() ptd.SetTeamLock(lock) - ptd.MarkListed() } tasks, err := t.manager.Store().SearchTasks(ctx, team.ID, query, searchPageSize, filterUserID) diff --git a/internal/tools/team_tool_dispatch.go b/internal/tools/team_tool_dispatch.go index 51e6f0beb..78d881675 100644 --- a/internal/tools/team_tool_dispatch.go +++ b/internal/tools/team_tool_dispatch.go @@ -153,15 +153,15 @@ func (m *TeamToolManager) dispatchTaskToAgent(ctx context.Context, task *store.T } meta := map[string]string{ - MetaOriginChannel: originChannel, - MetaOriginPeerKind: originPeerKind, - MetaOriginChatID: originChatID, - MetaOriginUserID: originUserID, - MetaFromAgent: fromAgent, - MetaToAgent: ag.AgentKey, - MetaToAgentDisplay: ag.DisplayName, - MetaTeamTaskID: task.ID.String(), - MetaTeamID: teamID.String(), + MetaOriginChannel: originChannel, + MetaOriginPeerKind: originPeerKind, + MetaOriginChatID: originChatID, + MetaOriginUserID: originUserID, + MetaFromAgent: fromAgent, + MetaToAgent: ag.AgentKey, + MetaToAgentDisplay: ag.DisplayName, + MetaTeamTaskID: task.ID.String(), + MetaTeamID: teamID.String(), } // Resolve local key from context; fallback to task metadata for deferred dispatches. localKey := ToolLocalKeyFromCtx(ctx) @@ -217,6 +217,8 @@ func (m *TeamToolManager) dispatchTaskToAgent(ctx context.Context, task *store.T "task_id", task.ID, "agent_key", ag.AgentKey, "team_id", teamID, + "origin_channel", originChannel, + "origin_chat_id", originChatID, ) } From efb8b5f440d7e1161bc0bf7f89db05c264418911 Mon Sep 17 00:00:00 2001 From: kokorolx Date: Mon, 30 Mar 2026 17:24:05 +0000 Subject: [PATCH 4/8] fix(tools): read_image fallback to context images when path access denied When team workspace is active, uploaded images are in per-user .uploads/ dir which falls outside the team workspace boundary. Instead of failing with access denied, fall back to context images already loaded in memory by enrichInputMedia. --- internal/tools/read_image.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/tools/read_image.go b/internal/tools/read_image.go index b9caca1bb..9dc39016d 100644 --- a/internal/tools/read_image.go +++ b/internal/tools/read_image.go @@ -81,14 +81,17 @@ func (t *ReadImageTool) Execute(ctx context.Context, args map[string]any) *Resul prompt = "Describe this image in detail." } - // If path is provided, load image from workspace file + // If path is provided, load image from workspace file. + // On failure, fall back to context images (already loaded by enrichInputMedia). images := MediaImagesFromCtx(ctx) if imgPath, _ := args["path"].(string); imgPath != "" { fileImages, err := t.loadImageFromPath(ctx, imgPath) if err != nil { - return ErrorResult(err.Error()) + slog.Warn("read_image: path load failed, falling back to context images", + "path", imgPath, "error", err, "context_images", len(images)) + } else { + images = fileImages } - images = fileImages } if len(images) == 0 { From 4c52459dc4b9b47d64c8b1477e89ea1c2f1632f7 Mon Sep 17 00:00:00 2001 From: kokorolx Date: Thu, 2 Apr 2026 00:56:25 +0000 Subject: [PATCH 5/8] =?UTF-8?q?fix(agent):=20fix=20HTTP=20400=20=E2=80=94?= =?UTF-8?q?=20defer=20warning=20msgs=20and=20synthesize=20tool=5Fresults?= =?UTF-8?q?=20on=20budget=20exceed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Parallel tool batch: collect deferredWarnings, append all tool_result messages first, then warnings — prevents user-role messages from interleaving with tool_result blocks (Anthropic requires tool_results immediately after tool_use) - When toolResultBreak fires mid-batch, finish appending remaining tool_result messages before breaking to keep assistant/tool_result pairs intact - On tool budget exceeded: synthesize tool_result for each pending tool_use before injecting the budget-exceeded user message - gateway_announce_queue: strengthen follow-up task prompt (MUST check, not SHOULD) - gateway_consumer_handlers/normal: send LLM error alerts on agent run failure - shell_deny_groups: fix 'host' deny pattern — match only at command position to avoid blocking 'hostname', 'hostnamectl' etc - claude_cli_deny_patterns: same host pattern fix --- cmd/gateway_announce_queue.go | 4 +- cmd/gateway_consumer_handlers.go | 2 + cmd/gateway_consumer_normal.go | 1 + internal/agent/loop.go | 25 ++++++++++--- .../providers/claude_cli_deny_patterns.go | 2 +- internal/tools/shell_deny_groups.go | 18 ++++----- internal/tools/shell_deny_test.go | 37 ++++++++++++++++--- 7 files changed, 67 insertions(+), 22 deletions(-) diff --git a/cmd/gateway_announce_queue.go b/cmd/gateway_announce_queue.go index c3e45c73d..3a2377f1e 100644 --- a/cmd/gateway_announce_queue.go +++ b/cmd/gateway_announce_queue.go @@ -135,8 +135,8 @@ func processAnnounceLoop( content := buildMergedAnnounceContent(entries, snapshot, r.TeamWorkspace) if allDirect { content += "\n\n[The member already posted the result directly to the chat. Do NOT repeat or rephrase their work. " + - "However, you SHOULD still create follow-up tasks if your workflow requires it (e.g. code review gate, deployment gate, QA). " + - "If you have no follow-up tasks and no new information, respond with NO_REPLY.]" + "You MUST check if a follow-up task is needed (e.g. code review gate after dev completion, deployment gate, QA) and create it via team_tasks. " + + "Only respond with NO_REPLY if you have already confirmed no follow-up is required.]" } req := agent.RunRequest{ diff --git a/cmd/gateway_consumer_handlers.go b/cmd/gateway_consumer_handlers.go index a3aa5773b..5ef436295 100644 --- a/cmd/gateway_consumer_handlers.go +++ b/cmd/gateway_consumer_handlers.go @@ -136,6 +136,7 @@ func handleSubagentAnnounce( return } slog.Error("subagent announce: agent run failed", "error", outcome.Err) + sendLLMErrorAlert(outcome.Err, origCh, chatID) deps.MsgBus.PublishOutbound(bus.OutboundMessage{ Channel: origCh, ChatID: chatID, @@ -325,6 +326,7 @@ func handleTeammateMessage( var announceMedia []agent.MediaResult if outcome.Err != nil { slog.Error("teammate message: agent run failed", "error", outcome.Err) + sendLLMErrorAlert(outcome.Err, origCh, origChatID) errMsg := outcome.Err.Error() if len(errMsg) > 500 { errMsg = errMsg[:500] + "..." diff --git a/cmd/gateway_consumer_normal.go b/cmd/gateway_consumer_normal.go index b32a4b86c..2d14510ea 100644 --- a/cmd/gateway_consumer_normal.go +++ b/cmd/gateway_consumer_normal.go @@ -409,6 +409,7 @@ func processNormalMessage( return } slog.Error("inbound: agent run failed", "error", outcome.Err, "channel", channel) + sendLLMErrorAlert(outcome.Err, channel, chatID) deps.MsgBus.PublishOutbound(bus.OutboundMessage{ Channel: channel, ChatID: chatID, diff --git a/internal/agent/loop.go b/internal/agent/loop.go index 4a7d86023..811f387a3 100644 --- a/internal/agent/loop.go +++ b/internal/agent/loop.go @@ -493,6 +493,13 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult, if l.maxToolCalls > 0 && rs.totalToolCalls > l.maxToolCalls { slog.Warn("security.tool_budget_exceeded", "agent", l.id, "total", rs.totalToolCalls, "limit", l.maxToolCalls) + for _, tc := range resp.ToolCalls { + messages = append(messages, providers.Message{ + Role: "tool", + Content: "[Tool call skipped — tool budget exceeded]", + ToolCallID: tc.ID, + }) + } messages = append(messages, providers.Message{ Role: "user", Content: fmt.Sprintf("[System] Tool call budget reached (%d/%d). Do NOT call any more tools. Summarize results so far and respond to the user.", rs.totalToolCalls, l.maxToolCalls), @@ -678,20 +685,28 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult, // 5. Process results sequentially: emit events, append messages, save to session // Note: tool span start/end already emitted inside goroutines above. var loopStuck bool - for _, r := range collected { - // Record tool execution time for adaptive thresholds. + var deferredWarnings []providers.Message + var breakIdx int = -1 + for i, r := range collected { toolTiming.Record(r.tc.Name, time.Since(r.spanStart).Milliseconds()) - - // Process tool result: loop detection, events, media, deliverables. toolMsg, warningMsgs, action := l.processToolResult(ctx, rs, &req, emitRun, r.tc, r.registryName, r.result, hadBootstrap) messages = append(messages, toolMsg) rs.pendingMsgs = append(rs.pendingMsgs, toolMsg) - messages = append(messages, warningMsgs...) + deferredWarnings = append(deferredWarnings, warningMsgs...) if action == toolResultBreak { loopStuck = true + breakIdx = i break } } + if loopStuck && breakIdx >= 0 { + for _, r := range collected[breakIdx+1:] { + toolMsg, _, _ := l.processToolResult(ctx, rs, &req, emitRun, r.tc, r.registryName, r.result, hadBootstrap) + messages = append(messages, toolMsg) + rs.pendingMsgs = append(rs.pendingMsgs, toolMsg) + } + } + messages = append(messages, deferredWarnings...) // Check read-only streak after processing all parallel results. if !loopStuck { diff --git a/internal/providers/claude_cli_deny_patterns.go b/internal/providers/claude_cli_deny_patterns.go index 8eed2566e..c51da5f39 100644 --- a/internal/providers/claude_cli_deny_patterns.go +++ b/internal/providers/claude_cli_deny_patterns.go @@ -20,7 +20,7 @@ var ShellDenyPatterns = []string{ `\bcurl\b.*(-d\b|-F\b|--data|--upload|--form|-T\b|-X\s*P(UT|OST|ATCH))`, `\bwget\b.*-O\s*-\s*\|\s*(ba)?sh\b`, `\bwget\b.*--post-(data|file)`, - `\b(nslookup|dig|host)\b`, + `\b(nslookup|dig)\b|(^|[;|&]\s*)host\s`, `/dev/tcp/`, // Reverse shells diff --git a/internal/tools/shell_deny_groups.go b/internal/tools/shell_deny_groups.go index 56fe42823..99c056676 100644 --- a/internal/tools/shell_deny_groups.go +++ b/internal/tools/shell_deny_groups.go @@ -29,7 +29,7 @@ var DenyGroupRegistry = map[string]*DenyGroup{ regexp.MustCompile(`\bdd\s+if=`), regexp.MustCompile(`>\s*/dev/sd[a-z]\b`), regexp.MustCompile(`\b(shutdown|reboot|poweroff|halt)\b`), - regexp.MustCompile(`\b(init|telinit)\s+[06]\b`), // SysV shutdown/reboot + regexp.MustCompile(`\b(init|telinit)\s+[06]\b`), // SysV shutdown/reboot regexp.MustCompile(`\bsystemctl\s+(suspend|hibernate)\b`), // power management regexp.MustCompile(`:\(\)\s*\{.*\};\s*:`), // fork bomb }, @@ -39,15 +39,15 @@ var DenyGroupRegistry = map[string]*DenyGroup{ Description: "Data Exfiltration", Default: true, Patterns: []*regexp.Regexp{ - regexp.MustCompile(`\bcurl\b.*\|\s*(ba)?sh\b`), // curl | sh + regexp.MustCompile(`\bcurl\b.*\|\s*(ba)?sh\b`), // curl | sh regexp.MustCompile(`\bcurl\b.*(-d\b|-F\b|--data|--upload|--form|-T\b|(-X|--request)\s*P(UT|OST|ATCH))`), // curl POST/PUT - regexp.MustCompile(`\bwget\b.*-O\s*-\s*\|\s*(ba)?sh\b`), // wget | sh - regexp.MustCompile(`\bwget\b.*(--post-(data|file)|--method=P(UT|OST|ATCH)|--body-data)`), // wget POST - regexp.MustCompile(`\b(nslookup|dig|host)\b`), // DNS exfiltration - regexp.MustCompile(`/dev/tcp/`), // bash tcp redirect - regexp.MustCompile(`\b(curl|wget)\b.*\blocalhost\b`), // curl/wget to localhost - regexp.MustCompile(`\b(curl|wget)\b.*\b127\.0\.0\.1\b`), // curl/wget to 127.0.0.1 - regexp.MustCompile(`\b(curl|wget)\b.*\b0\.0\.0\.0\b`), // curl/wget to 0.0.0.0 + regexp.MustCompile(`\bwget\b.*-O\s*-\s*\|\s*(ba)?sh\b`), // wget | sh + regexp.MustCompile(`\bwget\b.*(--post-(data|file)|--method=P(UT|OST|ATCH)|--body-data)`), // wget POST + regexp.MustCompile(`\b(nslookup|dig)\b|(^|[;|&]\s*)host\s`), // DNS exfiltration (host only at command position) + regexp.MustCompile(`/dev/tcp/`), // bash tcp redirect + regexp.MustCompile(`\b(curl|wget)\b.*\blocalhost\b`), // curl/wget to localhost + regexp.MustCompile(`\b(curl|wget)\b.*\b127\.0\.0\.1\b`), // curl/wget to 127.0.0.1 + regexp.MustCompile(`\b(curl|wget)\b.*\b0\.0\.0\.0\b`), // curl/wget to 0.0.0.0 }, }, "reverse_shell": { diff --git a/internal/tools/shell_deny_test.go b/internal/tools/shell_deny_test.go index eb3e18a39..d85a88fce 100644 --- a/internal/tools/shell_deny_test.go +++ b/internal/tools/shell_deny_test.go @@ -19,10 +19,10 @@ func TestBase64DecodeShellDeny(t *testing.T) { } allowed := []string{ - "base64 -w0 file.txt", // encode, no pipe to shell - "base64 -d file.txt", // decode without pipe to shell - "echo hello | base64", // encode - "base64 --decode file.txt", // decode without pipe to shell + "base64 -w0 file.txt", // encode, no pipe to shell + "base64 -d file.txt", // decode without pipe to shell + "echo hello | base64", // encode + "base64 --decode file.txt", // decode without pipe to shell } for _, cmd := range denied { @@ -98,7 +98,7 @@ func TestDestructiveOpsGaps(t *testing.T) { ) mustAllow(t, patterns, - "halting the process", // "halt" inside word + "halting the process", // "halt" inside word "initialize", // "init" inside word "initial setup", // "init" inside word "init_db", // no space+digit after init @@ -170,6 +170,33 @@ func TestExecute_RejectsNULByte(t *testing.T) { } } +func TestDataExfiltrationHostPattern(t *testing.T) { + patterns := DenyGroupRegistry["data_exfiltration"].Patterns + + mustDeny(t, patterns, + "nslookup example.com", + "dig example.com", + "dig +short example.com", + "host example.com", + "host -t MX example.com", + "; host example.com", + "&& host example.com", + "| host example.com", + ) + + mustAllow(t, patterns, + "mysql --host localhost -u root", + "node -e \"const host = process.env.DB_HOST\"", + "psql -h localhost", + "curl --host-header example.com http://1.2.3.4", + "hostname", + "hostnamectl", + "DB_HOST=localhost node migrate.js", + "echo $DB_HOST", + "export PGHOST=localhost", + ) +} + func TestLimitedBuffer(t *testing.T) { t.Run("under limit", func(t *testing.T) { lb := &limitedBuffer{max: 100} From 37f392ec8535eb1a4a6828bcf13acbe38b3ee02e Mon Sep 17 00:00:00 2001 From: kokorolx Date: Thu, 2 Apr 2026 05:36:01 +0000 Subject: [PATCH 6/8] feat(ticker): auto-dispatch orphaned pending tasks with owner_agent_id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tasks assigned an owner but never dispatched (e.g. PM session timeout mid-turn) accumulate as pending with owner_agent_id set. Without this fix they sit until 2h stale threshold — or forever if PM doesn't retry. - Add TaskDispatcher interface (DispatchTaskToAgent + CachedGetAgentByID + Store) - Add SetDispatcher() on TaskTicker; wire TeamToolManager at startup via type assertion - Add Step 6 in recoverAll: ListOrphanedPendingTasks (pending + owner + updated_at > 10m) → dispatch directly to assigned agent, bypassing PM - Add ListOrphanedPendingTasks to TaskRecoveryStore interface + PG + SQLite impls --- cmd/gateway.go | 5 +- internal/store/pg/teams_tasks_progress.go | 20 ++++++ .../store/sqlitestore/teams_tasks_activity.go | 18 +++++ internal/store/team_store.go | 24 +++---- internal/tasks/task_ticker.go | 67 +++++++++++++++---- 5 files changed, 107 insertions(+), 27 deletions(-) diff --git a/cmd/gateway.go b/cmd/gateway.go index 985a179b0..6f16643ff 100644 --- a/cmd/gateway.go +++ b/cmd/gateway.go @@ -28,8 +28,8 @@ import ( "github.com/nextlevelbuilder/goclaw/internal/config" "github.com/nextlevelbuilder/goclaw/internal/edition" "github.com/nextlevelbuilder/goclaw/internal/gateway" - "github.com/nextlevelbuilder/goclaw/internal/heartbeat" "github.com/nextlevelbuilder/goclaw/internal/gateway/methods" + "github.com/nextlevelbuilder/goclaw/internal/heartbeat" httpapi "github.com/nextlevelbuilder/goclaw/internal/http" mcpbridge "github.com/nextlevelbuilder/goclaw/internal/mcp" "github.com/nextlevelbuilder/goclaw/internal/media" @@ -1093,6 +1093,9 @@ func runGateway() { var taskTicker *tasks.TaskTicker if pgStores.Teams != nil { taskTicker = tasks.NewTaskTicker(pgStores.Teams, pgStores.Agents, msgBus, cfg.Gateway.TaskRecoveryIntervalSec) + if d, ok := postTurn.(tasks.TaskDispatcher); ok { + taskTicker.SetDispatcher(d) + } taskTicker.Start() } diff --git a/internal/store/pg/teams_tasks_progress.go b/internal/store/pg/teams_tasks_progress.go index fcd172e33..82e7d5a16 100644 --- a/internal/store/pg/teams_tasks_progress.go +++ b/internal/store/pg/teams_tasks_progress.go @@ -216,6 +216,26 @@ func (s *PGTeamStore) FixOrphanedBlockedTasks(ctx context.Context) ([]store.Reco return scanRecoveredTaskInfoRows(rows) } +func (s *PGTeamStore) ListOrphanedPendingTasks(ctx context.Context, olderThan time.Time) ([]store.TeamTaskData, error) { + tid := tenantIDForInsert(ctx) + rows, err := s.db.QueryContext(ctx, + `SELECT `+taskSelectCols+` + `+taskJoinClause+` + WHERE t.status = $1 + AND t.owner_agent_id IS NOT NULL + AND t.updated_at < $2 + AND t.tenant_id = $3 + ORDER BY t.priority DESC, t.created_at + LIMIT 50`, + store.TeamTaskStatusPending, olderThan, tid, + ) + if err != nil { + return nil, err + } + defer rows.Close() + return scanTaskRowsJoined(rows) +} + func scanRecoveredTaskInfoRows(rows interface { Next() bool Scan(...any) error diff --git a/internal/store/sqlitestore/teams_tasks_activity.go b/internal/store/sqlitestore/teams_tasks_activity.go index e5ac69a5f..cb670564f 100644 --- a/internal/store/sqlitestore/teams_tasks_activity.go +++ b/internal/store/sqlitestore/teams_tasks_activity.go @@ -473,6 +473,24 @@ func (s *SQLiteTeamStore) FixOrphanedBlockedTasks(ctx context.Context) ([]store. return infos, nil } +func (s *SQLiteTeamStore) ListOrphanedPendingTasks(ctx context.Context, olderThan time.Time) ([]store.TeamTaskData, error) { + rows, err := s.db.QueryContext(ctx, + `SELECT `+taskSelectCols+` + `+taskJoinClause+` + WHERE t.status = ? + AND t.owner_agent_id IS NOT NULL + AND t.updated_at < ? + ORDER BY t.priority DESC, t.created_at + LIMIT 50`, + store.TeamTaskStatusPending, olderThan.UTC().Format(time.RFC3339), + ) + if err != nil { + return nil, err + } + defer rows.Close() + return scanTaskRows(rows) +} + // queryRecoveredTasks fetches recently-updated tasks matching the given status for recovery reporting. func (s *SQLiteTeamStore) queryRecoveredTasks(ctx context.Context, status string) ([]store.RecoveredTaskInfo, error) { rows, err := s.db.QueryContext(ctx, diff --git a/internal/store/team_store.go b/internal/store/team_store.go index 0ce698095..18d4f9094 100644 --- a/internal/store/team_store.go +++ b/internal/store/team_store.go @@ -50,13 +50,12 @@ const ( // Team task list filter constants (for ListTasks statusFilter parameter). const ( - TeamTaskFilterActive = "active" // pending + in_progress + blocked + TeamTaskFilterActive = "active" // pending + in_progress + blocked TeamTaskFilterInReview = "in_review" // only in_review tasks TeamTaskFilterCompleted = "completed" // only completed tasks TeamTaskFilterAll = "all" // all statuses (default when "" passed) ) - // TeamData represents an agent team. type TeamData struct { BaseModel @@ -106,17 +105,17 @@ type TeamTaskData struct { Channel string `json:"channel,omitempty"` // V2 fields - TaskType string `json:"task_type"` - TaskNumber int `json:"task_number,omitempty"` - Identifier string `json:"identifier,omitempty"` + TaskType string `json:"task_type"` + TaskNumber int `json:"task_number,omitempty"` + Identifier string `json:"identifier,omitempty"` CreatedByAgentID *uuid.UUID `json:"created_by_agent_id,omitempty"` - AssigneeUserID string `json:"assignee_user_id,omitempty"` - ParentID *uuid.UUID `json:"parent_id,omitempty"` - ChatID string `json:"chat_id,omitempty"` - LockedAt *time.Time `json:"locked_at,omitempty"` - LockExpiresAt *time.Time `json:"lock_expires_at,omitempty"` - ProgressPercent int `json:"progress_percent,omitempty"` - ProgressStep string `json:"progress_step,omitempty"` + AssigneeUserID string `json:"assignee_user_id,omitempty"` + ParentID *uuid.UUID `json:"parent_id,omitempty"` + ChatID string `json:"chat_id,omitempty"` + LockedAt *time.Time `json:"locked_at,omitempty"` + LockExpiresAt *time.Time `json:"lock_expires_at,omitempty"` + ProgressPercent int `json:"progress_percent,omitempty"` + ProgressStep string `json:"progress_step,omitempty"` // Follow-up reminder fields FollowupAt *time.Time `json:"followup_at,omitempty"` @@ -256,6 +255,7 @@ type TaskRecoveryStore interface { MarkAllStaleTasks(ctx context.Context, olderThan time.Time) ([]RecoveredTaskInfo, error) MarkInReviewStaleTasks(ctx context.Context, olderThan time.Time) ([]RecoveredTaskInfo, error) FixOrphanedBlockedTasks(ctx context.Context) ([]RecoveredTaskInfo, error) + ListOrphanedPendingTasks(ctx context.Context, olderThan time.Time) ([]TeamTaskData, error) } // TaskFollowupStore manages follow-up reminder scheduling. diff --git a/internal/tasks/task_ticker.go b/internal/tasks/task_ticker.go index 13ccf9a34..fa64791a6 100644 --- a/internal/tasks/task_ticker.go +++ b/internal/tasks/task_ticker.go @@ -18,26 +18,32 @@ import ( ) const ( - defaultRecoveryInterval = 5 * time.Minute - defaultStaleThreshold = 2 * time.Hour - defaultInReviewThreshold = 4 * time.Hour - followupCooldown = 5 * time.Minute - defaultFollowupInterval = 30 * time.Minute + defaultRecoveryInterval = 5 * time.Minute + defaultStaleThreshold = 2 * time.Hour + defaultInReviewThreshold = 4 * time.Hour + followupCooldown = 5 * time.Minute + defaultFollowupInterval = 30 * time.Minute + orphanedPendingThreshold = 10 * time.Minute ) -// TaskTicker periodically recovers stale tasks and re-dispatches pending work. -// All recovery/stale/followup queries are batched across v2 active teams (single SQL each). +type TaskDispatcher interface { + DispatchTaskToAgent(ctx context.Context, task *store.TeamTaskData, team *store.TeamData, agentID uuid.UUID) + CachedGetAgentByID(ctx context.Context, id uuid.UUID) (*store.AgentData, error) + Store() store.TeamStore +} + type TaskTicker struct { - teams store.TeamStore - agents store.AgentStore - msgBus *bus.MessageBus - interval time.Duration + teams store.TeamStore + agents store.AgentStore + msgBus *bus.MessageBus + interval time.Duration + dispatcher TaskDispatcher stopCh chan struct{} wg sync.WaitGroup mu sync.Mutex - lastFollowupSent map[uuid.UUID]time.Time // taskID → last followup sent time + lastFollowupSent map[uuid.UUID]time.Time } func NewTaskTicker(teams store.TeamStore, agents store.AgentStore, msgBus *bus.MessageBus, intervalSec int) *TaskTicker { @@ -55,7 +61,10 @@ func NewTaskTicker(teams store.TeamStore, agents store.AgentStore, msgBus *bus.M } } -// Start launches the background recovery loop. +func (t *TaskTicker) SetDispatcher(d TaskDispatcher) { + t.dispatcher = d +} + func (t *TaskTicker) Start() { t.wg.Add(1) go t.loop() @@ -166,7 +175,12 @@ func (t *TaskTicker) recoverAll(forceRecover bool) { "They are now pending and will be dispatched if assigned.") } - // Step 6: Prune old cooldown entries to prevent memory leak. + // Step 6: Auto-dispatch orphaned pending tasks (have owner but were never dispatched). + if t.dispatcher != nil { + t.dispatchOrphanedPendingTasks(recoverCtx) + } + + // Step 7: Prune old cooldown entries to prevent memory leak. t.pruneCooldowns() } @@ -383,6 +397,31 @@ func followupInterval(team store.TeamData) time.Duration { return defaultFollowupInterval } +func (t *TaskTicker) dispatchOrphanedPendingTasks(ctx context.Context) { + threshold := time.Now().Add(-orphanedPendingThreshold) + tasks, err := t.teams.ListOrphanedPendingTasks(ctx, threshold) + if err != nil { + slog.Warn("task_ticker: list orphaned pending tasks", "error", err) + return + } + if len(tasks) == 0 { + return + } + slog.Info("task_ticker: dispatching orphaned pending tasks", "count", len(tasks)) + for i := range tasks { + task := &tasks[i] + if task.OwnerAgentID == nil { + continue + } + team, err := t.teams.GetTeam(ctx, task.TeamID) + if err != nil { + slog.Warn("task_ticker: orphan dispatch — get team failed", "task_id", task.ID, "error", err) + continue + } + t.dispatcher.DispatchTaskToAgent(ctx, task, team, *task.OwnerAgentID) + } +} + func (t *TaskTicker) pruneCooldowns() { t.mu.Lock() defer t.mu.Unlock() From 65cf6392bdfacac7140fa56056e5ae805fcca860 Mon Sep 17 00:00:00 2001 From: kokorolx Date: Thu, 2 Apr 2026 05:50:41 +0000 Subject: [PATCH 7/8] fix(ticker): nil team panic + cross-tenant context for orphan dispatch - Guard GetTeam nil result to prevent nil pointer dereference - Use WithCrossTenant context so GetTeam bypasses tenant filter - Remove tenant_id WHERE clause from ListOrphanedPendingTasks (cross-tenant query) --- internal/store/pg/teams_tasks_progress.go | 4 +--- internal/tasks/task_ticker.go | 9 +++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/internal/store/pg/teams_tasks_progress.go b/internal/store/pg/teams_tasks_progress.go index 82e7d5a16..df419c300 100644 --- a/internal/store/pg/teams_tasks_progress.go +++ b/internal/store/pg/teams_tasks_progress.go @@ -217,17 +217,15 @@ func (s *PGTeamStore) FixOrphanedBlockedTasks(ctx context.Context) ([]store.Reco } func (s *PGTeamStore) ListOrphanedPendingTasks(ctx context.Context, olderThan time.Time) ([]store.TeamTaskData, error) { - tid := tenantIDForInsert(ctx) rows, err := s.db.QueryContext(ctx, `SELECT `+taskSelectCols+` `+taskJoinClause+` WHERE t.status = $1 AND t.owner_agent_id IS NOT NULL AND t.updated_at < $2 - AND t.tenant_id = $3 ORDER BY t.priority DESC, t.created_at LIMIT 50`, - store.TeamTaskStatusPending, olderThan, tid, + store.TeamTaskStatusPending, olderThan, ) if err != nil { return nil, err diff --git a/internal/tasks/task_ticker.go b/internal/tasks/task_ticker.go index fa64791a6..45046517c 100644 --- a/internal/tasks/task_ticker.go +++ b/internal/tasks/task_ticker.go @@ -399,7 +399,8 @@ func followupInterval(team store.TeamData) time.Duration { func (t *TaskTicker) dispatchOrphanedPendingTasks(ctx context.Context) { threshold := time.Now().Add(-orphanedPendingThreshold) - tasks, err := t.teams.ListOrphanedPendingTasks(ctx, threshold) + xCtx := store.WithCrossTenant(ctx) + tasks, err := t.teams.ListOrphanedPendingTasks(xCtx, threshold) if err != nil { slog.Warn("task_ticker: list orphaned pending tasks", "error", err) return @@ -413,12 +414,12 @@ func (t *TaskTicker) dispatchOrphanedPendingTasks(ctx context.Context) { if task.OwnerAgentID == nil { continue } - team, err := t.teams.GetTeam(ctx, task.TeamID) - if err != nil { + team, err := t.teams.GetTeam(xCtx, task.TeamID) + if err != nil || team == nil { slog.Warn("task_ticker: orphan dispatch — get team failed", "task_id", task.ID, "error", err) continue } - t.dispatcher.DispatchTaskToAgent(ctx, task, team, *task.OwnerAgentID) + t.dispatcher.DispatchTaskToAgent(xCtx, task, team, *task.OwnerAgentID) } } From 5eca7e9404a2f1ab16e675b171a4c9943a4c8e8a Mon Sep 17 00:00:00 2001 From: kokorolx Date: Thu, 2 Apr 2026 07:45:12 +0000 Subject: [PATCH 8/8] fix(browser): use LookPath to find chromium binary explicitly Prevents 'can't find browser binary' on systems where Chrome/Chromium is installed in non-standard paths (e.g. /snap/bin/chromium) --- pkg/browser/browser.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/browser/browser.go b/pkg/browser/browser.go index c6ee41461..8e0523ad6 100644 --- a/pkg/browser/browser.go +++ b/pkg/browser/browser.go @@ -13,14 +13,14 @@ import ( // Manager handles the Chrome browser lifecycle and page management. type Manager struct { - mu sync.Mutex - browser *rod.Browser - refs *RefStore - pages map[string]*rod.Page // targetID → page - console map[string][]ConsoleMessage // targetID → console messages - tenantCtxs map[string]*rod.Browser // tenantID → incognito browser context - pageTenants map[string]string // targetID → tenantID (for filtering) - pageLastUsed map[string]time.Time // targetID → last access time + mu sync.Mutex + browser *rod.Browser + refs *RefStore + pages map[string]*rod.Page // targetID → page + console map[string][]ConsoleMessage // targetID → console messages + tenantCtxs map[string]*rod.Browser // tenantID → incognito browser context + pageTenants map[string]string // targetID → tenantID (for filtering) + pageLastUsed map[string]time.Time // targetID → last access time headless bool remoteURL string // CDP endpoint for remote Chrome (sidecar); skips local launcher actionTimeout time.Duration // per-action context timeout (default 30s) @@ -132,6 +132,9 @@ func (m *Manager) Start(ctx context.Context) error { Set("disable-gpu"). Set("no-first-run"). Set("no-default-browser-check") + if p, ok := launcher.LookPath(); ok { + l = l.Bin(p) + } u, err := l.Launch() if err != nil {