Skip to content
Open
5 changes: 4 additions & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/config"
"github.com/nextlevelbuilder/goclaw/internal/edition"
"github.com/nextlevelbuilder/goclaw/internal/gateway"
"github.com/nextlevelbuilder/goclaw/internal/heartbeat"
"github.com/nextlevelbuilder/goclaw/internal/gateway/methods"
"github.com/nextlevelbuilder/goclaw/internal/heartbeat"
httpapi "github.com/nextlevelbuilder/goclaw/internal/http"
mcpbridge "github.com/nextlevelbuilder/goclaw/internal/mcp"
"github.com/nextlevelbuilder/goclaw/internal/media"
Expand Down Expand Up @@ -1093,6 +1093,9 @@ func runGateway() {
var taskTicker *tasks.TaskTicker
if pgStores.Teams != nil {
taskTicker = tasks.NewTaskTicker(pgStores.Teams, pgStores.Agents, msgBus, cfg.Gateway.TaskRecoveryIntervalSec)
if d, ok := postTurn.(tasks.TaskDispatcher); ok {
taskTicker.SetDispatcher(d)
}
taskTicker.Start()
}

Expand Down
28 changes: 23 additions & 5 deletions cmd/gateway_announce_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (

// announceEntry holds one teammate completion result waiting to be announced.
type announceEntry struct {
MemberAgent string // agent key (e.g. "researcher")
MemberDisplayName string // display name (e.g. "Nhà Nghiên Cứu"), empty if not set
Content string
Media []agent.MediaResult
MemberAgent string // agent key (e.g. "researcher")
MemberDisplayName string // display name (e.g. "Nhà Nghiên Cứu"), empty if not set
Content string
Media []agent.MediaResult
DirectDelivered bool // true when member already posted result to channel directly
}

// announceQueueState tracks the per-session announce queue.
Expand Down Expand Up @@ -123,7 +124,20 @@ func processAnnounceLoop(
}
}

allDirect := true
for _, e := range entries {
if !e.DirectDelivered {
allDirect = false
break
}
}

content := buildMergedAnnounceContent(entries, snapshot, r.TeamWorkspace)
if allDirect {
content += "\n\n[The member already posted the result directly to the chat. Do NOT repeat or rephrase their work. " +
"You MUST check if a follow-up task is needed (e.g. code review gate after dev completion, deployment gate, QA) and create it via team_tasks. " +
"Only respond with NO_REPLY if you have already confirmed no follow-up is required.]"
}

req := agent.RunRequest{
SessionKey: r.LeadSessionKey,
Expand Down Expand Up @@ -193,7 +207,11 @@ func processAnnounceLoop(
}

slog.Info("teammate announce: batch processed",
"batch_size", len(entries), "session", r.LeadSessionKey)
"batch_size", len(entries),
"all_direct", allDirect,
"channel", r.OrigChannel,
"session", r.LeadSessionKey,
)

// Loop back — tryFinish at top will exit when queue is truly empty.
}
Expand Down
34 changes: 33 additions & 1 deletion cmd/gateway_consumer_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func handleSubagentAnnounce(
return
}
slog.Error("subagent announce: agent run failed", "error", outcome.Err)
sendLLMErrorAlert(outcome.Err, origCh, chatID)
deps.MsgBus.PublishOutbound(bus.OutboundMessage{
Channel: origCh,
ChatID: chatID,
Expand Down Expand Up @@ -325,6 +326,7 @@ func handleTeammateMessage(
var announceMedia []agent.MediaResult
if outcome.Err != nil {
slog.Error("teammate message: agent run failed", "error", outcome.Err)
sendLLMErrorAlert(outcome.Err, origCh, origChatID)
errMsg := outcome.Err.Error()
if len(errMsg) > 500 {
errMsg = errMsg[:500] + "..."
Expand Down Expand Up @@ -368,6 +370,35 @@ func handleTeammateMessage(
}
}

// Direct delivery: post the member's result to the origin chat via the
// member agent's own channel (so it appears under the correct bot identity).
// Falls back to origin channel if the member has no dedicated channel.
if origChatID != "" && outcome.Err == nil && !agent.IsSilentReply(announceContent) {
memberAgent := inMeta[tools.MetaToAgent]
memberChannel := ""
usingFallback := false
if deps.ChannelMgr != nil {
memberChannel = deps.ChannelMgr.ChannelForAgent(memberAgent)
}
if memberChannel == "" {
memberChannel = origCh
usingFallback = true
}
slog.Info("teammate direct delivery",
"member_agent", memberAgent,
"channel", memberChannel,
"chat_id", origChatID,
"fallback", usingFallback,
"content_len", len(announceContent),
)
deps.MsgBus.PublishOutbound(bus.OutboundMessage{
Channel: memberChannel,
ChatID: origChatID,
Content: announceContent,
Metadata: outMeta,
})
}

// Announce result (or failure) to lead agent via announce queue.
// Queue merges concurrent completions into a single batched announce.
if origChatID == "" {
Expand Down Expand Up @@ -424,12 +455,13 @@ func handleTeammateMessage(
announceContent = string([]rune(announceContent)[:50_000]) + "\n[truncated]"
}

// Enqueue result. If we become the processor, run the announce loop.
directDelivered := origChatID != "" && outcome.Err == nil && !agent.IsSilentReply(announceContent)
entry := announceEntry{
MemberAgent: inMeta[tools.MetaToAgent],
MemberDisplayName: inMeta[tools.MetaToAgentDisplay],
Content: announceContent,
Media: announceMedia,
DirectDelivered: directDelivered,
}
q, isProcessor := enqueueAnnounce(leadSessionKey, entry)
if !isProcessor {
Expand Down
7 changes: 7 additions & 0 deletions cmd/gateway_consumer_normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ func processNormalMessage(
schedCtx = tools.WithRunKind(schedCtx, rk)
}

chatTeamID := ""
if deps.ChannelMgr != nil {
chatTeamID = deps.ChannelMgr.ResolveChatTeam(msg.Channel, msg.ChatID)
}

// Schedule through main lane (per-session concurrency controlled by maxConcurrent)
outCh := deps.Sched.ScheduleWithOpts(schedCtx, "main", agent.RunRequest{
SessionKey: sessionKey,
Expand All @@ -364,6 +369,7 @@ func processNormalMessage(
ToolAllow: msg.ToolAllow,
ExtraSystemPrompt: extraPrompt,
SkillFilter: skillFilter,
TeamID: chatTeamID,
}, scheduler.ScheduleOpts{
MaxConcurrent: maxConcurrent,
})
Expand Down Expand Up @@ -403,6 +409,7 @@ func processNormalMessage(
return
}
slog.Error("inbound: agent run failed", "error", outcome.Err, "channel", channel)
sendLLMErrorAlert(outcome.Err, channel, chatID)
deps.MsgBus.PublishOutbound(bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Expand Down
37 changes: 26 additions & 11 deletions internal/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,15 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult,
Payload: map[string]any{"phase": "thinking", "iteration": rs.iteration},
})

// Build per-iteration tool list: policy, tenant exclusions, bootstrap, skill visibility,
// channel type, and final-iteration stripping.
var toolDefs []providers.ToolDefinition
var allowedTools map[string]bool
// Resolve per-user MCP tools (servers requiring user credentials).
// Must run before buildFilteredTools so tools are in the Registry for policy filtering.
if req.UserID != "" {
l.getUserMCPTools(iterCtx, req.UserID)
}
toolDefs, allowedTools, messages = l.buildFilteredTools(&req, hadBootstrap, rs.iteration, maxIter, messages)
var toolChoice string
toolDefs, allowedTools, messages, toolChoice = l.buildFilteredTools(&req, hadBootstrap, rs.iteration, maxIter, messages)

// Use per-request overrides if set (e.g. heartbeat uses cheaper provider/model).
model := l.model
Expand All @@ -244,9 +243,10 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult,
}

chatReq := providers.ChatRequest{
Messages: messages,
Tools: toolDefs,
Model: model,
Messages: messages,
Tools: toolDefs,
ToolChoice: toolChoice,
Model: model,
Options: map[string]any{
providers.OptMaxTokens: l.effectiveMaxTokens(),
providers.OptTemperature: config.DefaultTemperature,
Expand Down Expand Up @@ -493,6 +493,13 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult,
if l.maxToolCalls > 0 && rs.totalToolCalls > l.maxToolCalls {
slog.Warn("security.tool_budget_exceeded",
"agent", l.id, "total", rs.totalToolCalls, "limit", l.maxToolCalls)
for _, tc := range resp.ToolCalls {
messages = append(messages, providers.Message{
Role: "tool",
Content: "[Tool call skipped — tool budget exceeded]",
ToolCallID: tc.ID,
})
}
messages = append(messages, providers.Message{
Role: "user",
Content: fmt.Sprintf("[System] Tool call budget reached (%d/%d). Do NOT call any more tools. Summarize results so far and respond to the user.", rs.totalToolCalls, l.maxToolCalls),
Expand Down Expand Up @@ -678,20 +685,28 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (result *RunResult,
// 5. Process results sequentially: emit events, append messages, save to session
// Note: tool span start/end already emitted inside goroutines above.
var loopStuck bool
for _, r := range collected {
// Record tool execution time for adaptive thresholds.
var deferredWarnings []providers.Message
var breakIdx int = -1
for i, r := range collected {
toolTiming.Record(r.tc.Name, time.Since(r.spanStart).Milliseconds())

// Process tool result: loop detection, events, media, deliverables.
toolMsg, warningMsgs, action := l.processToolResult(ctx, rs, &req, emitRun, r.tc, r.registryName, r.result, hadBootstrap)
messages = append(messages, toolMsg)
rs.pendingMsgs = append(rs.pendingMsgs, toolMsg)
messages = append(messages, warningMsgs...)
deferredWarnings = append(deferredWarnings, warningMsgs...)
if action == toolResultBreak {
loopStuck = true
breakIdx = i
break
}
}
if loopStuck && breakIdx >= 0 {
for _, r := range collected[breakIdx+1:] {
toolMsg, _, _ := l.processToolResult(ctx, rs, &req, emitRun, r.tc, r.registryName, r.result, hadBootstrap)
messages = append(messages, toolMsg)
rs.pendingMsgs = append(rs.pendingMsgs, toolMsg)
}
}
messages = append(messages, deferredWarnings...)

// Check read-only streak after processing all parallel results.
if !loopStuck {
Expand Down
12 changes: 12 additions & 0 deletions internal/agent/loop_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,26 @@ func (l *Loop) injectContext(ctx context.Context, req *RunRequest) (contextSetup
}

// Team workspace: dispatched task overrides default workspace.
// Preserve the agent's own workspace so file tools can still allow access
// to project files (e.g. the code repo the agent normally works in).
if req.TeamWorkspace != "" {
if err := os.MkdirAll(req.TeamWorkspace, 0755); err != nil {
slog.Warn("failed to create team workspace directory", "workspace", req.TeamWorkspace, "error", err)
}
// Save the agent's BASE workspace (l.workspace), not the per-user
// derived workspace from context which may already have user-isolation
// layers applied (e.g. /workspaces/group_discord-pm_123...).
if l.workspace != "" {
ctx = tools.WithToolAgentWorkspace(ctx, l.workspace)
}
ctx = tools.WithToolTeamWorkspace(ctx, req.TeamWorkspace)
ctx = tools.WithToolWorkspace(ctx, req.TeamWorkspace)
}
if req.TeamID != "" {
ctx = tools.WithToolTeamID(ctx, req.TeamID)
if tid, err := uuid.Parse(req.TeamID); err == nil && tid != uuid.Nil {
ctx = store.WithMemoryTeamID(ctx, tid)
}
}
if req.LeaderAgentID != "" {
ctx = tools.WithLeaderAgentID(ctx, req.LeaderAgentID)
Expand Down Expand Up @@ -172,6 +183,7 @@ func (l *Loop) injectContext(ctx context.Context, req *RunRequest) (contextSetup
}
if req.TeamID == "" {
ctx = tools.WithToolTeamID(ctx, team.ID.String())
ctx = store.WithMemoryTeamID(ctx, team.ID)
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions internal/agent/loop_tool_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
// disabled tools, bootstrap mode, skill visibility, channel type, and iteration budget.
// Per-user MCP tools must be registered in the Registry before calling this function
// (via getUserMCPTools) so they are included in policy filtering and execution.
// Returns tool definitions for the provider, an allowed-tools map for execution validation,
// and the (potentially modified) messages slice when final-iteration stripping appends a hint.
func (l *Loop) buildFilteredTools(req *RunRequest, hadBootstrap bool, iteration, maxIter int, messages []providers.Message) ([]providers.ToolDefinition, map[string]bool, []providers.Message) {
// Returns tool definitions, an allowed-tools map, the (potentially modified) messages slice,
// and a tool_choice override ("" = auto, "none" = no tool calls allowed).
func (l *Loop) buildFilteredTools(req *RunRequest, hadBootstrap bool, iteration, maxIter int, messages []providers.Message) ([]providers.ToolDefinition, map[string]bool, []providers.Message, string) {
// Build provider request with policy-filtered tools.
var toolDefs []providers.ToolDefinition
var allowedTools map[string]bool
Expand Down Expand Up @@ -81,15 +81,16 @@ func (l *Loop) buildFilteredTools(req *RunRequest, hadBootstrap bool, iteration,
toolDefs = filtered
}

// Final iteration: strip all tools to force a text-only response.
// Without this the model may keep requesting tools and exit with "...".
// Final iteration: do NOT nil toolDefs — Anthropic rejects requests containing
// tool_use/tool_result history without a top-level "tools" parameter (HTTP 400).
// Use tool_choice="none" to enforce text-only response at the API level instead.
if iteration == maxIter {
toolDefs = nil
messages = append(messages, providers.Message{
Role: "user",
Content: "[System] Final iteration reached. Summarize all findings and respond to the user now. No more tool calls allowed.",
})
return toolDefs, allowedTools, messages, "none"
}

return toolDefs, allowedTools, messages
return toolDefs, allowedTools, messages, ""
}
28 changes: 21 additions & 7 deletions internal/channels/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ type DMPolicy string

const (
DMPolicyPairing DMPolicy = "pairing" // Require pairing code
DMPolicyAllowlist DMPolicy = "allowlist" // Only whitelisted senders
DMPolicyOpen DMPolicy = "open" // Accept all
DMPolicyDisabled DMPolicy = "disabled" // Reject all DMs
DMPolicyAllowlist DMPolicy = "allowlist" // Only whitelisted senders
DMPolicyOpen DMPolicy = "open" // Accept all
DMPolicyDisabled DMPolicy = "disabled" // Reject all DMs
)

// GroupPolicy controls how group messages are handled.
type GroupPolicy string

const (
GroupPolicyOpen GroupPolicy = "open" // Accept all groups
GroupPolicyAllowlist GroupPolicy = "allowlist" // Only whitelisted groups
GroupPolicyDisabled GroupPolicy = "disabled" // No group messages
GroupPolicyAllowlist GroupPolicy = "allowlist" // Only whitelisted groups
GroupPolicyDisabled GroupPolicy = "disabled" // No group messages
)

// Channel type constants used across channel packages and gateway wiring.
Expand Down Expand Up @@ -149,6 +149,15 @@ type ReactionChannel interface {
ClearReaction(ctx context.Context, chatID string, messageID string) error
}

// TeamMappedChannel is optionally implemented by channels that map specific
// chat IDs to Goclaw team IDs. Used to route inbound messages to the correct
// team when an agent leads multiple teams.
type TeamMappedChannel interface {
Channel
// ResolveChatTeam returns the team ID for the given chat ID, or "" if not mapped.
ResolveChatTeam(chatID string) string
}

// BaseChannel provides shared functionality for all channel implementations.
// Channel implementations should embed this struct.
type BaseChannel struct {
Expand All @@ -157,9 +166,10 @@ type BaseChannel struct {
bus *bus.MessageBus
running bool
allowList []string
agentID string // for DB instances: routes to specific agent (empty = use resolveAgentRoute)
tenantID uuid.UUID // for DB instances: tenant scope (zero = master tenant fallback)
agentID string // for DB instances: routes to specific agent (empty = use resolveAgentRoute)
tenantID uuid.UUID // for DB instances: tenant scope (zero = master tenant fallback)
contactCollector *store.ContactCollector // optional: auto-collect contacts from channel messages
chatTeamMap map[string]string
}

// NewBaseChannel creates a new BaseChannel with the given parameters.
Expand Down Expand Up @@ -203,6 +213,10 @@ func (c *BaseChannel) SetTenantID(id uuid.UUID) { c.tenantID = id }
// SetContactCollector sets the contact collector for auto-collecting contacts from messages.
func (c *BaseChannel) SetContactCollector(cc *store.ContactCollector) { c.contactCollector = cc }

func (c *BaseChannel) SetChatTeamMap(m map[string]string) { c.chatTeamMap = m }

func (c *BaseChannel) ResolveChatTeam(chatID string) string { return c.chatTeamMap[chatID] }

// ContactCollector returns the contact collector (may be nil).
func (c *BaseChannel) ContactCollector() *store.ContactCollector { return c.contactCollector }

Expand Down
Loading