From 581719ebaf355b534a46ca72dfcff577225fbbfb Mon Sep 17 00:00:00 2001 From: Bruno Bornsztein Date: Mon, 2 Feb 2026 21:31:25 -0600 Subject: [PATCH 1/2] feat(relay): add agent-to-agent messaging system MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement Agent Relay-style messaging for agent-to-agent communication: - Add relay_messages table for persistent message storage - Create internal/relay package with Relay manager, agent registry, and message routing - Support direct messages, broadcast (*), and case-insensitive agent names - Integrate with executor for agent registration on task start - Add idle detection for message delivery timing - Add CLI commands: ty relay send/read/list - Display relay messages in task detail logs with 📨 icon - Include comprehensive tests for relay functionality Messages are routed by agent name (derived from task title) and can be sent between running agents. Pending messages are delivered when agents are idle. Co-Authored-By: Claude Opus 4.5 --- cmd/task/main.go | 238 +++++++++++++++++++++++++++++++++ internal/db/relay.go | 128 ++++++++++++++++++ internal/db/sqlite.go | 14 ++ internal/executor/executor.go | 29 +++++ internal/executor/relay.go | 193 +++++++++++++++++++++++++++ internal/relay/relay.go | 239 ++++++++++++++++++++++++++++++++++ internal/relay/relay_test.go | 199 ++++++++++++++++++++++++++++ internal/relay/store.go | 136 +++++++++++++++++++ internal/ui/detail.go | 2 + 9 files changed, 1178 insertions(+) create mode 100644 internal/db/relay.go create mode 100644 internal/executor/relay.go create mode 100644 internal/relay/relay.go create mode 100644 internal/relay/relay_test.go create mode 100644 internal/relay/store.go diff --git a/cmd/task/main.go b/cmd/task/main.go index b65426d3..688cb641 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -4,6 +4,8 @@ package main import ( "bufio" "context" + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" "io" @@ -60,6 +62,13 @@ func getDaemonSessionName() string { return fmt.Sprintf("task-daemon-%s", getSessionID()) } +// generateShortID generates a short random ID for relay messages. +func generateShortID() string { + b := make([]byte, 4) + rand.Read(b) + return hex.EncodeToString(b) +} + func main() { var dangerous bool @@ -1895,6 +1904,235 @@ Examples: rootCmd.AddCommand(eventsCmd) + // Relay subcommand - agent-to-agent messaging + relayCmd := &cobra.Command{ + Use: "relay", + Short: "Agent-to-agent messaging", + Long: `Send and receive messages between running agent sessions. + +Messages are routed by agent name (typically the task title). Agents must be +actively running to send or receive messages. + +Examples: + ty relay send Bob "Can you review my changes?" + ty relay send * "Broadcast to all agents" + ty relay list # List connected agents + ty relay read abc123 # Read message by ID`, + } + + // Relay send subcommand + relaySendCmd := &cobra.Command{ + Use: "send ", + Short: "Send a message to an agent", + Long: `Send a message to another running agent. + +The argument can be: + - An agent name (case-insensitive) + - * for broadcast to all agents + +Examples: + ty relay send Bob "Hello!" + ty relay send bob "Can you review?" # case-insensitive + ty relay send * "Everyone check this"`, + Args: cobra.MinimumNArgs(2), + Run: func(cmd *cobra.Command, args []string) { + to := args[0] + message := strings.Join(args[1:], " ") + fromTask, _ := cmd.Flags().GetInt64("from") + + // Open database + dbPath := db.DefaultPath() + database, err := db.Open(dbPath) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + defer database.Close() + + // Create relay store and send message + from := "cli" + if fromTask > 0 { + task, err := database.GetTask(fromTask) + if err == nil && task != nil { + from = task.Title + } + } + + msg := &db.RelayMessage{ + ID: generateShortID(), + From: from, + To: to, + Content: message, + TaskID: fromTask, + Status: "pending", + CreatedAt: db.LocalTime{Time: time.Now()}, + } + + if err := database.SaveRelayMessage(msg); err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + + fmt.Println(successStyle.Render(fmt.Sprintf("Message sent (id: %s)", msg.ID))) + }, + } + relaySendCmd.Flags().Int64("from", 0, "Sender task ID (for attribution)") + relayCmd.AddCommand(relaySendCmd) + + // Relay read subcommand + relayReadCmd := &cobra.Command{ + Use: "read [message-id]", + Short: "Read relay messages", + Long: `Read a specific message by ID or list recent messages for an agent. + +Examples: + ty relay read abc123 # Read specific message + ty relay read --agent Bob # List messages for Bob`, + Run: func(cmd *cobra.Command, args []string) { + agentName, _ := cmd.Flags().GetString("agent") + limit, _ := cmd.Flags().GetInt("limit") + + // Open database + dbPath := db.DefaultPath() + database, err := db.Open(dbPath) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + defer database.Close() + + // If message ID provided, show that specific message + if len(args) > 0 { + msg, err := database.GetRelayMessage(args[0]) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + if msg == nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Message not found")) + os.Exit(1) + } + + fmt.Printf("%s → %s\n", boldStyle.Render(msg.From), msg.To) + fmt.Printf("%s\n", dimStyle.Render(msg.CreatedAt.Format("2006-01-02 15:04:05"))) + fmt.Printf("\n%s\n", msg.Content) + + // Mark as read + database.MarkRelayMessageRead(msg.ID) + return + } + + // List messages for agent + if agentName == "" { + fmt.Fprintln(os.Stderr, errorStyle.Render("Provide a message ID or --agent flag")) + os.Exit(1) + } + + messages, err := database.GetRelayMessagesForAgent(agentName, limit) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + + if len(messages) == 0 { + fmt.Println(dimStyle.Render("No messages")) + return + } + + for _, msg := range messages { + status := msg.Status + if status == "pending" { + status = boldStyle.Render("• pending") + } + preview := msg.Content + if len(preview) > 60 { + preview = preview[:60] + "..." + } + fmt.Printf("%s %s → %s: %s\n", + dimStyle.Render(msg.ID), + msg.From, + msg.To, + preview, + ) + } + }, + } + relayReadCmd.Flags().String("agent", "", "Filter messages for this agent") + relayReadCmd.Flags().Int("limit", 20, "Maximum messages to show") + relayCmd.AddCommand(relayReadCmd) + + // Relay list subcommand - list connected agents + relayListCmd := &cobra.Command{ + Use: "list", + Short: "List connected agents", + Long: `Show all agents currently registered with the relay. + +Agents are registered when their tasks start executing and unregistered +when tasks complete.`, + Run: func(cmd *cobra.Command, args []string) { + // Open database + dbPath := db.DefaultPath() + database, err := db.Open(dbPath) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + defer database.Close() + + // Get processing tasks as a proxy for connected agents + tasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusProcessing, Limit: 100}) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + + // Also include blocked tasks (they're still "connected") + blockedTasks, _ := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) + tasks = append(tasks, blockedTasks...) + + if len(tasks) == 0 { + fmt.Println(dimStyle.Render("No agents connected")) + return + } + + fmt.Println(boldStyle.Render("Connected Agents")) + fmt.Println() + for _, task := range tasks { + name := task.Title + if name == "" { + name = fmt.Sprintf("task-%d", task.ID) + } + // Clean name for relay addressing + name = strings.Map(func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { + return r + } + if r == ' ' { + return '-' + } + return -1 + }, name) + if len(name) > 32 { + name = name[:32] + } + + status := task.Status + if status == db.StatusProcessing { + status = "active" + } + fmt.Printf(" %s #%d %s (%s)\n", + boldStyle.Render(name), + task.ID, + task.Project, + dimStyle.Render(status), + ) + } + }, + } + relayCmd.AddCommand(relayListCmd) + + rootCmd.AddCommand(relayCmd) + // Projects subcommand - manage projects projectsCmd := &cobra.Command{ Use: "projects", diff --git a/internal/db/relay.go b/internal/db/relay.go new file mode 100644 index 00000000..dce39bee --- /dev/null +++ b/internal/db/relay.go @@ -0,0 +1,128 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" +) + +// RelayMessage represents a stored relay message. +type RelayMessage struct { + ID string + From string + To string + Content string + TaskID int64 + Status string // pending, delivered, read + CreatedAt LocalTime + DeliveredAt *LocalTime + ReadAt *LocalTime +} + +// SaveRelayMessage stores a relay message. +func (db *DB) SaveRelayMessage(msg *RelayMessage) error { + _, err := db.Exec(` + INSERT INTO relay_messages (id, from_agent, to_agent, content, task_id, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, msg.ID, msg.From, msg.To, msg.Content, msg.TaskID, msg.Status, msg.CreatedAt.Time) + if err != nil { + return fmt.Errorf("insert relay message: %w", err) + } + return nil +} + +// GetRelayMessage retrieves a message by ID. +func (db *DB) GetRelayMessage(id string) (*RelayMessage, error) { + msg := &RelayMessage{} + err := db.QueryRow(` + SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status, + created_at, delivered_at, read_at + FROM relay_messages WHERE id = ? + `, id).Scan( + &msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status, + &msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("query relay message: %w", err) + } + return msg, nil +} + +// GetRelayMessagesForAgent retrieves messages for an agent (direct or broadcast). +func (db *DB) GetRelayMessagesForAgent(agentName string, limit int) ([]*RelayMessage, error) { + normalized := strings.ToLower(strings.TrimSpace(agentName)) + rows, err := db.Query(` + SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status, + created_at, delivered_at, read_at + FROM relay_messages + WHERE LOWER(to_agent) = ? OR to_agent = '*' + ORDER BY created_at DESC + LIMIT ? + `, normalized, limit) + if err != nil { + return nil, fmt.Errorf("query relay messages: %w", err) + } + defer rows.Close() + + var messages []*RelayMessage + for rows.Next() { + msg := &RelayMessage{} + if err := rows.Scan( + &msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status, + &msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt, + ); err != nil { + return nil, fmt.Errorf("scan relay message: %w", err) + } + messages = append(messages, msg) + } + return messages, nil +} + +// GetPendingRelayMessages retrieves pending messages for an agent. +func (db *DB) GetPendingRelayMessages(agentName string) ([]*RelayMessage, error) { + normalized := strings.ToLower(strings.TrimSpace(agentName)) + rows, err := db.Query(` + SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status, + created_at, delivered_at, read_at + FROM relay_messages + WHERE (LOWER(to_agent) = ? OR to_agent = '*') AND status = 'pending' + ORDER BY created_at ASC + `, normalized) + if err != nil { + return nil, fmt.Errorf("query pending relay messages: %w", err) + } + defer rows.Close() + + var messages []*RelayMessage + for rows.Next() { + msg := &RelayMessage{} + if err := rows.Scan( + &msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status, + &msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt, + ); err != nil { + return nil, fmt.Errorf("scan relay message: %w", err) + } + messages = append(messages, msg) + } + return messages, nil +} + +// MarkRelayMessageDelivered marks a message as delivered. +func (db *DB) MarkRelayMessageDelivered(id string) error { + _, err := db.Exec(` + UPDATE relay_messages SET status = 'delivered', delivered_at = ? WHERE id = ? + `, time.Now(), id) + return err +} + +// MarkRelayMessageRead marks a message as read. +func (db *DB) MarkRelayMessageRead(id string) error { + _, err := db.Exec(` + UPDATE relay_messages SET status = 'read', read_at = ? WHERE id = ? + `, time.Now(), id) + return err +} diff --git a/internal/db/sqlite.go b/internal/db/sqlite.go index 07bd75d4..bd95ed28 100644 --- a/internal/db/sqlite.go +++ b/internal/db/sqlite.go @@ -180,6 +180,20 @@ func (db *DB) migrate() error { `CREATE INDEX IF NOT EXISTS idx_event_log_event_type ON event_log(event_type)`, `CREATE INDEX IF NOT EXISTS idx_event_log_created_at ON event_log(created_at)`, + // Relay messages for agent-to-agent communication + `CREATE TABLE IF NOT EXISTS relay_messages ( + id TEXT PRIMARY KEY, + from_agent TEXT NOT NULL, + to_agent TEXT NOT NULL, + content TEXT NOT NULL, + task_id INTEGER, + status TEXT DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + delivered_at DATETIME, + read_at DATETIME + )`, + `CREATE INDEX IF NOT EXISTS idx_relay_messages_to ON relay_messages(to_agent)`, + `CREATE INDEX IF NOT EXISTS idx_relay_messages_status ON relay_messages(status)`, } for _, m := range migrations { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index d107b0b9..36320946 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -67,6 +67,9 @@ type Executor struct { executorSlug string executorName string + + // Relay manager for agent-to-agent messaging + relay *RelayManager } // DefaultSuspendIdleTimeout is the default time a blocked task must be idle before being suspended. @@ -161,6 +164,9 @@ func New(database *db.DB, cfg *config.Config) *Executor { e.executorFactory.Register(NewOpenCodeExecutor(e)) e.executorFactory.Register(NewPiExecutor(e)) + // Initialize relay manager for agent-to-agent messaging + e.relay = NewRelayManager(e) + return e } @@ -198,6 +204,9 @@ func NewWithLogging(database *db.DB, cfg *config.Config, w io.Writer) *Executor e.executorFactory.Register(NewOpenCodeExecutor(e)) e.executorFactory.Register(NewPiExecutor(e)) + // Initialize relay manager for agent-to-agent messaging + e.relay = NewRelayManager(e) + return e } @@ -217,6 +226,11 @@ func (e *Executor) ExecutorSlug() string { return e.executorSlug } +// Relay returns the relay manager for agent-to-agent messaging. +func (e *Executor) Relay() *RelayManager { + return e.relay +} + // Start begins the background worker. func (e *Executor) Start(ctx context.Context) { e.mu.Lock() @@ -695,6 +709,11 @@ func (e *Executor) worker(ctx context.Context) { case <-ticker.C: e.processNextTask(ctx) + // Deliver pending relay messages to idle agents + if e.relay != nil { + e.relay.DeliverPendingMessages(ctx) + } + tickCount++ // Periodically check for merged branches @@ -862,6 +881,11 @@ func (e *Executor) executeTask(ctx context.Context, task *db.Task) { // Log start and trigger hook startMsg := fmt.Sprintf("Starting task #%d: %s", task.ID, task.Title) e.logLine(task.ID, "system", startMsg) + + // Register task as relay agent + if e.relay != nil { + e.relay.RegisterAgent(task) + } e.hooks.OnStatusChange(task, db.StatusProcessing, startMsg) // Setup worktree for isolated execution (symlinks claude config from project) @@ -3013,6 +3037,11 @@ func (e *Executor) logLine(taskID int64, lineType, content string) { // Store in database e.db.AppendTaskLog(taskID, lineType, content) + // Record activity for relay idle detection (except for relay messages themselves) + if e.relay != nil && lineType != "relay" { + e.relay.RecordActivity(taskID) + } + // Broadcast to subscribers logEntry := &db.TaskLog{ TaskID: taskID, diff --git a/internal/executor/relay.go b/internal/executor/relay.go new file mode 100644 index 00000000..5ed9a236 --- /dev/null +++ b/internal/executor/relay.go @@ -0,0 +1,193 @@ +package executor + +import ( + "context" + "fmt" + "os/exec" + "strings" + "sync" + "time" + + "github.com/bborn/workflow/internal/db" + "github.com/bborn/workflow/internal/relay" +) + +// RelayManager handles agent-to-agent messaging for the executor. +type RelayManager struct { + mu sync.RWMutex + relay *relay.Relay + executor *Executor + store *relay.DBStore + + // Track last activity per task for idle detection + lastActivity map[int64]time.Time +} + +// NewRelayManager creates a relay manager for the executor. +func NewRelayManager(e *Executor) *RelayManager { + store := relay.NewDBStore(e.db) + return &RelayManager{ + relay: relay.New(store), + executor: e, + store: store, + lastActivity: make(map[int64]time.Time), + } +} + +// RegisterAgent registers a task as an agent. +// Uses task title as the agent name. +func (rm *RelayManager) RegisterAgent(task *db.Task) { + name := rm.agentName(task) + rm.relay.Register(name, task.ID) + rm.executor.logger.Info("Registered relay agent", "name", name, "task", task.ID) +} + +// UnregisterAgent removes a task from the agent registry. +func (rm *RelayManager) UnregisterAgent(taskID int64) { + task, _ := rm.executor.db.GetTask(taskID) + if task != nil { + rm.relay.Unregister(rm.agentName(task)) + } +} + +// agentName derives agent name from task. +// Uses task title, cleaned up for relay addressing. +func (rm *RelayManager) agentName(task *db.Task) string { + // Use task ID as fallback, title otherwise + name := task.Title + if name == "" { + name = fmt.Sprintf("task-%d", task.ID) + } + // Clean up for use as agent name (remove special chars except hyphen) + name = strings.Map(func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { + return r + } + if r == ' ' { + return '-' + } + return -1 + }, name) + // Limit length + if len(name) > 32 { + name = name[:32] + } + return name +} + +// Send sends a message from one agent to another. +func (rm *RelayManager) Send(fromTaskID int64, to, content string) (string, error) { + task, err := rm.executor.db.GetTask(fromTaskID) + if err != nil || task == nil { + return "", fmt.Errorf("sender task not found") + } + + from := rm.agentName(task) + msgID, err := rm.relay.Send(from, to, content, fromTaskID) + if err != nil { + return "", err + } + + rm.executor.logLine(fromTaskID, "relay", fmt.Sprintf("Sent to %s: %s", to, truncate(content, 100))) + return msgID, nil +} + +// SendFromCLI sends a message from the CLI (not from a task). +func (rm *RelayManager) SendFromCLI(from, to, content string) (string, error) { + return rm.relay.Send(from, to, content, 0) +} + +// RecordActivity records that a task had output activity. +func (rm *RelayManager) RecordActivity(taskID int64) { + rm.mu.Lock() + rm.lastActivity[taskID] = time.Now() + rm.mu.Unlock() +} + +// IsIdle checks if a task has been idle for the given duration. +func (rm *RelayManager) IsIdle(taskID int64, idleDuration time.Duration) bool { + rm.mu.RLock() + lastActive, ok := rm.lastActivity[taskID] + rm.mu.RUnlock() + + if !ok { + return true // No activity recorded means idle + } + return time.Since(lastActive) >= idleDuration +} + +// DeliverPendingMessages checks for pending messages and delivers them to idle agents. +func (rm *RelayManager) DeliverPendingMessages(ctx context.Context) { + agents := rm.relay.ListAgents() + + for _, agent := range agents { + // Check if agent's task is idle (no output for 1.5 seconds) + if !rm.IsIdle(agent.TaskID, 1500*time.Millisecond) { + continue + } + + // Get pending messages + messages := rm.relay.GetPendingMessages(agent.Name) + if len(messages) == 0 { + continue + } + + // Deliver each message + for _, msg := range messages { + if err := rm.injectMessage(ctx, agent.TaskID, msg); err != nil { + rm.executor.logger.Error("Failed to inject relay message", "error", err, "task", agent.TaskID) + continue + } + rm.relay.MarkDelivered(msg.ID) + rm.executor.logLine(agent.TaskID, "relay", fmt.Sprintf("Received from %s: %s", msg.From, truncate(msg.Content, 100))) + } + } +} + +// injectMessage injects a relay message into the task's tmux pane. +func (rm *RelayManager) injectMessage(ctx context.Context, taskID int64, msg *relay.Message) error { + task, err := rm.executor.db.GetTask(taskID) + if err != nil || task == nil { + return fmt.Errorf("task not found") + } + + // Get the Claude pane ID + paneID := task.ClaudePaneID + if paneID == "" { + return fmt.Errorf("no pane ID for task %d", taskID) + } + + // Format the message for injection + formatted := msg.FormatForInjection() + + // Use tmux send-keys to inject the message + cmd := exec.CommandContext(ctx, "tmux", "send-keys", "-t", paneID, formatted, "") + return cmd.Run() +} + +// GetMessage retrieves a message by ID. +func (rm *RelayManager) GetMessage(id string) *relay.Message { + return rm.relay.GetMessage(id) +} + +// ListAgents returns all registered agents. +func (rm *RelayManager) ListAgents() []*relay.Agent { + return rm.relay.ListAgents() +} + +// GetAgentByName returns an agent by name. +func (rm *RelayManager) GetAgentByName(name string) *relay.Agent { + return rm.relay.GetAgent(name) +} + +// GetMessagesForAgent retrieves messages for an agent. +func (rm *RelayManager) GetMessagesForAgent(agentName string, limit int) ([]*relay.Message, error) { + return rm.store.GetMessagesForAgent(agentName, limit) +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + "..." +} diff --git a/internal/relay/relay.go b/internal/relay/relay.go new file mode 100644 index 00000000..83ad4a67 --- /dev/null +++ b/internal/relay/relay.go @@ -0,0 +1,239 @@ +// Package relay provides simple agent-to-agent messaging. +package relay + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/google/uuid" +) + +// Message represents a relay message. +type Message struct { + ID string `json:"id"` + From string `json:"from"` + To string `json:"to"` // agent name, #channel, or * for broadcast + Content string `json:"content"` + TaskID int64 `json:"task_id"` // sender's task + Status string `json:"status"` // pending, delivered, read + CreatedAt time.Time `json:"created_at"` + ReadAt *time.Time `json:"read_at,omitempty"` +} + +// Agent represents a connected agent. +type Agent struct { + Name string `json:"name"` + TaskID int64 `json:"task_id"` + Status string `json:"status"` // active, idle + LastSeen time.Time `json:"last_seen"` +} + +// Relay manages agent messaging. +type Relay struct { + mu sync.RWMutex + agents map[string]*Agent // normalized name -> agent + messages []*Message // in-memory queue (also persisted to DB) + store MessageStore // persistence layer +} + +// MessageStore is the interface for message persistence. +type MessageStore interface { + SaveMessage(msg *Message) error + GetMessage(id string) (*Message, error) + GetMessagesForAgent(agentName string, limit int) ([]*Message, error) + MarkDelivered(id string) error + MarkRead(id string) error +} + +// New creates a new Relay. +func New(store MessageStore) *Relay { + return &Relay{ + agents: make(map[string]*Agent), + store: store, + } +} + +// Register registers an agent. +func (r *Relay) Register(name string, taskID int64) *Agent { + r.mu.Lock() + defer r.mu.Unlock() + + key := normalize(name) + agent := &Agent{ + Name: name, + TaskID: taskID, + Status: "active", + LastSeen: time.Now(), + } + r.agents[key] = agent + return agent +} + +// Unregister removes an agent. +func (r *Relay) Unregister(name string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.agents, normalize(name)) +} + +// UpdateStatus updates an agent's status. +func (r *Relay) UpdateStatus(name, status string) { + r.mu.Lock() + defer r.mu.Unlock() + + if agent, ok := r.agents[normalize(name)]; ok { + agent.Status = status + agent.LastSeen = time.Now() + } +} + +// GetAgent returns an agent by name. +func (r *Relay) GetAgent(name string) *Agent { + r.mu.RLock() + defer r.mu.RUnlock() + return r.agents[normalize(name)] +} + +// ListAgents returns all registered agents. +func (r *Relay) ListAgents() []*Agent { + r.mu.RLock() + defer r.mu.RUnlock() + + agents := make([]*Agent, 0, len(r.agents)) + for _, a := range r.agents { + agents = append(agents, a) + } + return agents +} + +// Send sends a message. Returns the message ID. +func (r *Relay) Send(from, to, content string, fromTaskID int64) (string, error) { + msg := &Message{ + ID: uuid.New().String()[:8], // short ID for easy reference + From: from, + To: to, + Content: content, + TaskID: fromTaskID, + Status: "pending", + CreatedAt: time.Now(), + } + + if r.store != nil { + if err := r.store.SaveMessage(msg); err != nil { + return "", fmt.Errorf("save message: %w", err) + } + } + + r.mu.Lock() + r.messages = append(r.messages, msg) + r.mu.Unlock() + + return msg.ID, nil +} + +// GetPendingMessages returns pending messages for an agent. +func (r *Relay) GetPendingMessages(agentName string) []*Message { + r.mu.RLock() + defer r.mu.RUnlock() + + key := normalize(agentName) + var pending []*Message + + for _, msg := range r.messages { + if msg.Status != "pending" { + continue + } + // Match direct, broadcast, or channel + if msg.To == "*" || normalize(msg.To) == key { + pending = append(pending, msg) + } + } + return pending +} + +// MarkDelivered marks a message as delivered. +func (r *Relay) MarkDelivered(msgID string) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, msg := range r.messages { + if msg.ID == msgID { + msg.Status = "delivered" + if r.store != nil { + r.store.MarkDelivered(msgID) + } + break + } + } +} + +// GetMessage retrieves a message by ID. +func (r *Relay) GetMessage(id string) *Message { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, msg := range r.messages { + if msg.ID == id { + return msg + } + } + + // Try store if not in memory + if r.store != nil { + msg, _ := r.store.GetMessage(id) + return msg + } + return nil +} + +// ToJSON converts a message to JSON for logging. +func (m *Message) ToJSON() string { + b, _ := json.Marshal(m) + return string(b) +} + +// FromJSON parses a message from JSON. +func FromJSON(data string) (*Message, error) { + var m Message + if err := json.Unmarshal([]byte(data), &m); err != nil { + return nil, err + } + return &m, nil +} + +// FormatForInjection formats a message for terminal injection. +func (m *Message) FormatForInjection() string { + content := m.Content + truncated := "" + if len(content) > 500 { + content = content[:500] + "..." + truncated = fmt.Sprintf(" (truncated, full: ty relay read %s)", m.ID) + } + return fmt.Sprintf("\n[RELAY from %s%s]\n%s\n[/RELAY]\n", m.From, truncated, content) +} + +// ParseRelayCommand parses "->relay:target message" format. +// Returns (target, message, ok). +func ParseRelayCommand(input string) (string, string, bool) { + input = strings.TrimSpace(input) + if !strings.HasPrefix(input, "->relay:") { + return "", "", false + } + + rest := strings.TrimPrefix(input, "->relay:") + parts := strings.SplitN(rest, " ", 2) + if len(parts) < 2 { + return "", "", false + } + + target := strings.TrimSpace(parts[0]) + message := strings.TrimSpace(parts[1]) + return target, message, target != "" && message != "" +} + +func normalize(name string) string { + return strings.ToLower(strings.TrimSpace(name)) +} diff --git a/internal/relay/relay_test.go b/internal/relay/relay_test.go new file mode 100644 index 00000000..77937667 --- /dev/null +++ b/internal/relay/relay_test.go @@ -0,0 +1,199 @@ +package relay + +import ( + "testing" +) + +func TestRelay_RegisterAgent(t *testing.T) { + r := New(nil) + + // Register an agent + agent := r.Register("TestAgent", 123) + + if agent.Name != "TestAgent" { + t.Errorf("Expected name 'TestAgent', got '%s'", agent.Name) + } + if agent.TaskID != 123 { + t.Errorf("Expected taskID 123, got %d", agent.TaskID) + } + if agent.Status != "active" { + t.Errorf("Expected status 'active', got '%s'", agent.Status) + } + + // Retrieve by name (case-insensitive) + found := r.GetAgent("testagent") + if found == nil { + t.Error("Expected to find agent with lowercase name") + } + if found != nil && found.Name != "TestAgent" { + t.Errorf("Expected original name 'TestAgent', got '%s'", found.Name) + } +} + +func TestRelay_ListAgents(t *testing.T) { + r := New(nil) + + r.Register("Agent1", 1) + r.Register("Agent2", 2) + + agents := r.ListAgents() + if len(agents) != 2 { + t.Errorf("Expected 2 agents, got %d", len(agents)) + } +} + +func TestRelay_Unregister(t *testing.T) { + r := New(nil) + + r.Register("TestAgent", 123) + r.Unregister("TestAgent") + + found := r.GetAgent("TestAgent") + if found != nil { + t.Error("Expected agent to be unregistered") + } +} + +func TestRelay_SendAndReceive(t *testing.T) { + r := New(nil) + + // Register an agent + r.Register("Bob", 1) + + // Send a message to Bob + msgID, err := r.Send("Alice", "Bob", "Hello Bob!", 0) + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + if msgID == "" { + t.Error("Expected non-empty message ID") + } + + // Check pending messages for Bob + pending := r.GetPendingMessages("Bob") + if len(pending) != 1 { + t.Errorf("Expected 1 pending message, got %d", len(pending)) + } + + if len(pending) > 0 { + if pending[0].From != "Alice" { + t.Errorf("Expected from 'Alice', got '%s'", pending[0].From) + } + if pending[0].Content != "Hello Bob!" { + t.Errorf("Expected content 'Hello Bob!', got '%s'", pending[0].Content) + } + } + + // Mark as delivered + r.MarkDelivered(msgID) + + // Should no longer be pending + pending = r.GetPendingMessages("Bob") + if len(pending) != 0 { + t.Errorf("Expected 0 pending messages after delivery, got %d", len(pending)) + } +} + +func TestRelay_BroadcastMessage(t *testing.T) { + r := New(nil) + + // Register multiple agents + r.Register("Agent1", 1) + r.Register("Agent2", 2) + + // Send broadcast + _, err := r.Send("Coordinator", "*", "Attention all agents!", 0) + if err != nil { + t.Fatalf("Failed to send broadcast: %v", err) + } + + // Both agents should receive the message + pending1 := r.GetPendingMessages("Agent1") + pending2 := r.GetPendingMessages("Agent2") + + if len(pending1) != 1 { + t.Errorf("Agent1 expected 1 pending message, got %d", len(pending1)) + } + if len(pending2) != 1 { + t.Errorf("Agent2 expected 1 pending message, got %d", len(pending2)) + } +} + +func TestRelay_CaseInsensitiveNames(t *testing.T) { + r := New(nil) + + r.Register("TestAgent", 1) + + // Should find with different cases + if r.GetAgent("testagent") == nil { + t.Error("Should find agent with lowercase") + } + if r.GetAgent("TESTAGENT") == nil { + t.Error("Should find agent with uppercase") + } + if r.GetAgent("TestAgent") == nil { + t.Error("Should find agent with exact case") + } +} + +func TestParseRelayCommand(t *testing.T) { + tests := []struct { + input string + wantTarget string + wantMessage string + wantOk bool + }{ + {"->relay:Bob Hello!", "Bob", "Hello!", true}, + {"->relay:Alice Can you help?", "Alice", "Can you help?", true}, + {"->relay:* Broadcast message", "*", "Broadcast message", true}, + {"->relay:Agent-1 Test", "Agent-1", "Test", true}, + {"not a relay command", "", "", false}, + {"->relay:", "", "", false}, + {"->relay:Bob", "", "", false}, // No message + } + + for _, tt := range tests { + target, message, ok := ParseRelayCommand(tt.input) + if ok != tt.wantOk { + t.Errorf("ParseRelayCommand(%q): ok = %v, want %v", tt.input, ok, tt.wantOk) + } + if target != tt.wantTarget { + t.Errorf("ParseRelayCommand(%q): target = %q, want %q", tt.input, target, tt.wantTarget) + } + if message != tt.wantMessage { + t.Errorf("ParseRelayCommand(%q): message = %q, want %q", tt.input, message, tt.wantMessage) + } + } +} + +func TestMessage_FormatForInjection(t *testing.T) { + msg := &Message{ + ID: "abc123", + From: "Alice", + Content: "Hello!", + } + + formatted := msg.FormatForInjection() + if formatted == "" { + t.Error("Expected non-empty formatted message") + } + if !contains(formatted, "[RELAY from Alice]") { + t.Error("Expected formatted message to contain sender") + } + if !contains(formatted, "Hello!") { + t.Error("Expected formatted message to contain content") + } +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/internal/relay/store.go b/internal/relay/store.go new file mode 100644 index 00000000..966a6e0a --- /dev/null +++ b/internal/relay/store.go @@ -0,0 +1,136 @@ +package relay + +import ( + "time" + + "github.com/bborn/workflow/internal/db" +) + +// DBStore adapts db.DB to MessageStore interface. +type DBStore struct { + db *db.DB +} + +// NewDBStore creates a new database-backed message store. +func NewDBStore(database *db.DB) *DBStore { + return &DBStore{db: database} +} + +// SaveMessage persists a message to the database. +func (s *DBStore) SaveMessage(msg *Message) error { + dbMsg := &db.RelayMessage{ + ID: msg.ID, + From: msg.From, + To: msg.To, + Content: msg.Content, + TaskID: msg.TaskID, + Status: msg.Status, + CreatedAt: db.LocalTime{Time: msg.CreatedAt}, + } + return s.db.SaveRelayMessage(dbMsg) +} + +// GetMessage retrieves a message by ID. +func (s *DBStore) GetMessage(id string) (*Message, error) { + dbMsg, err := s.db.GetRelayMessage(id) + if err != nil || dbMsg == nil { + return nil, err + } + return dbToMessage(dbMsg), nil +} + +// GetMessagesForAgent retrieves messages for an agent. +func (s *DBStore) GetMessagesForAgent(agentName string, limit int) ([]*Message, error) { + dbMsgs, err := s.db.GetRelayMessagesForAgent(agentName, limit) + if err != nil { + return nil, err + } + + messages := make([]*Message, len(dbMsgs)) + for i, m := range dbMsgs { + messages[i] = dbToMessage(m) + } + return messages, nil +} + +// MarkDelivered marks a message as delivered. +func (s *DBStore) MarkDelivered(id string) error { + return s.db.MarkRelayMessageDelivered(id) +} + +// MarkRead marks a message as read. +func (s *DBStore) MarkRead(id string) error { + return s.db.MarkRelayMessageRead(id) +} + +func dbToMessage(m *db.RelayMessage) *Message { + msg := &Message{ + ID: m.ID, + From: m.From, + To: m.To, + Content: m.Content, + TaskID: m.TaskID, + Status: m.Status, + CreatedAt: m.CreatedAt.Time, + } + if m.ReadAt != nil { + t := m.ReadAt.Time + msg.ReadAt = &t + } + return msg +} + +// LoadPendingMessages loads pending messages from the database into the relay. +func (r *Relay) LoadPendingMessages(database *db.DB) error { + // Get all agents and load their pending messages + r.mu.Lock() + agents := make([]string, 0, len(r.agents)) + for _, a := range r.agents { + agents = append(agents, a.Name) + } + r.mu.Unlock() + + for _, name := range agents { + msgs, err := database.GetPendingRelayMessages(name) + if err != nil { + return err + } + r.mu.Lock() + for _, m := range msgs { + r.messages = append(r.messages, &Message{ + ID: m.ID, + From: m.From, + To: m.To, + Content: m.Content, + TaskID: m.TaskID, + Status: m.Status, + CreatedAt: m.CreatedAt.Time, + }) + } + r.mu.Unlock() + } + return nil +} + +// GetAgentByTaskID finds an agent by their task ID. +func (r *Relay) GetAgentByTaskID(taskID int64) *Agent { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, a := range r.agents { + if a.TaskID == taskID { + return a + } + } + return nil +} + +// Heartbeat updates an agent's last seen time. +func (r *Relay) Heartbeat(name string) { + r.mu.Lock() + defer r.mu.Unlock() + + if agent, ok := r.agents[normalize(name)]; ok { + agent.LastSeen = time.Now() + } +} diff --git a/internal/ui/detail.go b/internal/ui/detail.go index 6b58c8e4..38e1aa1d 100644 --- a/internal/ui/detail.go +++ b/internal/ui/detail.go @@ -2393,6 +2393,8 @@ func (m *DetailModel) renderContent() string { icon = "👤" case "output": icon = "📤" + case "relay": + icon = "📨" } var line string From cd5fcccddfd77497190ec09eba380f0f18ff6d5a Mon Sep 17 00:00:00 2001 From: Bruno Bornsztein Date: Thu, 5 Feb 2026 07:45:02 -0600 Subject: [PATCH 2/2] fix: address PR review comments and CI lint errors - Fix ineffectual assignment lint error (statusIcon variable) - Fix silent error handling for blockedTasks query - Add shared CleanAgentName function to avoid code duplication - Move GetAgentByTaskID and Heartbeat methods to relay.go - Add agent unregistration when tasks finish (fixes memory leak) - Track only "output" lines for idle detection - Replace custom contains() with strings.Contains in tests Co-Authored-By: Claude Opus 4.5 --- cmd/task/main.go | 37 +++++++------------ internal/executor/executor.go | 10 ++++- internal/executor/relay.go | 43 ++++++++-------------- internal/relay/relay.go | 49 +++++++++++++++++++++++++ internal/relay/relay_test.go | 18 ++------- internal/relay/store.go | 69 +++-------------------------------- 6 files changed, 96 insertions(+), 130 deletions(-) diff --git a/cmd/task/main.go b/cmd/task/main.go index 688cb641..631b979d 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -24,6 +24,7 @@ import ( "github.com/bborn/workflow/internal/db" "github.com/bborn/workflow/internal/executor" "github.com/bborn/workflow/internal/github" + "github.com/bborn/workflow/internal/relay" "github.com/bborn/workflow/internal/ui" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" @@ -2040,16 +2041,17 @@ Examples: } for _, msg := range messages { - status := msg.Status - if status == "pending" { - status = boldStyle.Render("• pending") + statusIcon := "" + if msg.Status == "pending" { + statusIcon = " •" } preview := msg.Content if len(preview) > 60 { preview = preview[:60] + "..." } - fmt.Printf("%s %s → %s: %s\n", + fmt.Printf("%s%s %s → %s: %s\n", dimStyle.Render(msg.ID), + boldStyle.Render(statusIcon), msg.From, msg.To, preview, @@ -2087,8 +2089,12 @@ when tasks complete.`, } // Also include blocked tasks (they're still "connected") - blockedTasks, _ := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) - tasks = append(tasks, blockedTasks...) + blockedTasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) + if err != nil { + fmt.Fprintln(os.Stderr, dimStyle.Render("Warning: failed to list blocked tasks")) + } else { + tasks = append(tasks, blockedTasks...) + } if len(tasks) == 0 { fmt.Println(dimStyle.Render("No agents connected")) @@ -2098,23 +2104,8 @@ when tasks complete.`, fmt.Println(boldStyle.Render("Connected Agents")) fmt.Println() for _, task := range tasks { - name := task.Title - if name == "" { - name = fmt.Sprintf("task-%d", task.ID) - } - // Clean name for relay addressing - name = strings.Map(func(r rune) rune { - if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { - return r - } - if r == ' ' { - return '-' - } - return -1 - }, name) - if len(name) > 32 { - name = name[:32] - } + // Use shared function to get agent name + name := relay.CleanAgentName(task.Title, fmt.Sprintf("task-%d", task.ID)) status := task.Status if status == db.StatusProcessing { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 36320946..bf316b3e 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -865,6 +865,12 @@ func (e *Executor) executeTask(ctx context.Context, task *db.Task) { delete(e.runningTasks, task.ID) delete(e.cancelFuncs, task.ID) e.mu.Unlock() + + // Unregister relay agent when task finishes + if e.relay != nil { + e.relay.UnregisterAgent(task.ID) + e.relay.ClearActivity(task.ID) + } }() e.logger.Info("Processing task", "id", task.ID, "title", task.Title) @@ -3037,8 +3043,8 @@ func (e *Executor) logLine(taskID int64, lineType, content string) { // Store in database e.db.AppendTaskLog(taskID, lineType, content) - // Record activity for relay idle detection (except for relay messages themselves) - if e.relay != nil && lineType != "relay" { + // Record activity for relay idle detection only for actual output lines + if e.relay != nil && lineType == "output" { e.relay.RecordActivity(taskID) } diff --git a/internal/executor/relay.go b/internal/executor/relay.go index 5ed9a236..0572a6ea 100644 --- a/internal/executor/relay.go +++ b/internal/executor/relay.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os/exec" - "strings" "sync" "time" @@ -37,7 +36,7 @@ func NewRelayManager(e *Executor) *RelayManager { // RegisterAgent registers a task as an agent. // Uses task title as the agent name. func (rm *RelayManager) RegisterAgent(task *db.Task) { - name := rm.agentName(task) + name := agentName(task) rm.relay.Register(name, task.ID) rm.executor.logger.Info("Registered relay agent", "name", name, "task", task.ID) } @@ -46,33 +45,16 @@ func (rm *RelayManager) RegisterAgent(task *db.Task) { func (rm *RelayManager) UnregisterAgent(taskID int64) { task, _ := rm.executor.db.GetTask(taskID) if task != nil { - rm.relay.Unregister(rm.agentName(task)) + name := agentName(task) + rm.relay.Unregister(name) + rm.executor.logger.Info("Unregistered relay agent", "name", name, "task", task.ID) } } -// agentName derives agent name from task. -// Uses task title, cleaned up for relay addressing. -func (rm *RelayManager) agentName(task *db.Task) string { - // Use task ID as fallback, title otherwise - name := task.Title - if name == "" { - name = fmt.Sprintf("task-%d", task.ID) - } - // Clean up for use as agent name (remove special chars except hyphen) - name = strings.Map(func(r rune) rune { - if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { - return r - } - if r == ' ' { - return '-' - } - return -1 - }, name) - // Limit length - if len(name) > 32 { - name = name[:32] - } - return name +// agentName derives agent name from task using the shared relay.CleanAgentName function. +func agentName(task *db.Task) string { + fallback := fmt.Sprintf("task-%d", task.ID) + return relay.CleanAgentName(task.Title, fallback) } // Send sends a message from one agent to another. @@ -82,7 +64,7 @@ func (rm *RelayManager) Send(fromTaskID int64, to, content string) (string, erro return "", fmt.Errorf("sender task not found") } - from := rm.agentName(task) + from := agentName(task) msgID, err := rm.relay.Send(from, to, content, fromTaskID) if err != nil { return "", err @@ -104,6 +86,13 @@ func (rm *RelayManager) RecordActivity(taskID int64) { rm.mu.Unlock() } +// ClearActivity removes activity tracking for a task (for cleanup). +func (rm *RelayManager) ClearActivity(taskID int64) { + rm.mu.Lock() + delete(rm.lastActivity, taskID) + rm.mu.Unlock() +} + // IsIdle checks if a task has been idle for the given duration. func (rm *RelayManager) IsIdle(taskID int64, idleDuration time.Duration) bool { rm.mu.RLock() diff --git a/internal/relay/relay.go b/internal/relay/relay.go index 83ad4a67..6d367dd6 100644 --- a/internal/relay/relay.go +++ b/internal/relay/relay.go @@ -237,3 +237,52 @@ func ParseRelayCommand(input string) (string, string, bool) { func normalize(name string) string { return strings.ToLower(strings.TrimSpace(name)) } + +// CleanAgentName cleans a raw name (like task title) for use as an agent name. +// It removes special characters, replaces spaces with hyphens, limits length, +// and normalizes to lowercase. +func CleanAgentName(rawName string, fallback string) string { + name := rawName + if name == "" { + name = fallback + } + // Clean up for use as agent name (keep alphanumeric, hyphen, underscore) + name = strings.Map(func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { + return r + } + if r == ' ' { + return '-' + } + return -1 + }, name) + // Limit length + if len(name) > 32 { + name = name[:32] + } + // Normalize to lowercase for consistent matching + return strings.ToLower(strings.TrimSpace(name)) +} + +// GetAgentByTaskID finds an agent by their task ID. +func (r *Relay) GetAgentByTaskID(taskID int64) *Agent { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, a := range r.agents { + if a.TaskID == taskID { + return a + } + } + return nil +} + +// Heartbeat updates an agent's last seen time. +func (r *Relay) Heartbeat(name string) { + r.mu.Lock() + defer r.mu.Unlock() + + if agent, ok := r.agents[normalize(name)]; ok { + agent.LastSeen = time.Now() + } +} diff --git a/internal/relay/relay_test.go b/internal/relay/relay_test.go index 77937667..a4e34660 100644 --- a/internal/relay/relay_test.go +++ b/internal/relay/relay_test.go @@ -1,6 +1,7 @@ package relay import ( + "strings" "testing" ) @@ -177,23 +178,10 @@ func TestMessage_FormatForInjection(t *testing.T) { if formatted == "" { t.Error("Expected non-empty formatted message") } - if !contains(formatted, "[RELAY from Alice]") { + if !strings.Contains(formatted, "[RELAY from Alice]") { t.Error("Expected formatted message to contain sender") } - if !contains(formatted, "Hello!") { + if !strings.Contains(formatted, "Hello!") { t.Error("Expected formatted message to contain content") } } - -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) -} - -func containsHelper(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false -} diff --git a/internal/relay/store.go b/internal/relay/store.go index 966a6e0a..05642d96 100644 --- a/internal/relay/store.go +++ b/internal/relay/store.go @@ -1,8 +1,6 @@ package relay import ( - "time" - "github.com/bborn/workflow/internal/db" ) @@ -19,12 +17,12 @@ func NewDBStore(database *db.DB) *DBStore { // SaveMessage persists a message to the database. func (s *DBStore) SaveMessage(msg *Message) error { dbMsg := &db.RelayMessage{ - ID: msg.ID, - From: msg.From, - To: msg.To, - Content: msg.Content, - TaskID: msg.TaskID, - Status: msg.Status, + ID: msg.ID, + From: msg.From, + To: msg.To, + Content: msg.Content, + TaskID: msg.TaskID, + Status: msg.Status, CreatedAt: db.LocalTime{Time: msg.CreatedAt}, } return s.db.SaveRelayMessage(dbMsg) @@ -79,58 +77,3 @@ func dbToMessage(m *db.RelayMessage) *Message { } return msg } - -// LoadPendingMessages loads pending messages from the database into the relay. -func (r *Relay) LoadPendingMessages(database *db.DB) error { - // Get all agents and load their pending messages - r.mu.Lock() - agents := make([]string, 0, len(r.agents)) - for _, a := range r.agents { - agents = append(agents, a.Name) - } - r.mu.Unlock() - - for _, name := range agents { - msgs, err := database.GetPendingRelayMessages(name) - if err != nil { - return err - } - r.mu.Lock() - for _, m := range msgs { - r.messages = append(r.messages, &Message{ - ID: m.ID, - From: m.From, - To: m.To, - Content: m.Content, - TaskID: m.TaskID, - Status: m.Status, - CreatedAt: m.CreatedAt.Time, - }) - } - r.mu.Unlock() - } - return nil -} - -// GetAgentByTaskID finds an agent by their task ID. -func (r *Relay) GetAgentByTaskID(taskID int64) *Agent { - r.mu.RLock() - defer r.mu.RUnlock() - - for _, a := range r.agents { - if a.TaskID == taskID { - return a - } - } - return nil -} - -// Heartbeat updates an agent's last seen time. -func (r *Relay) Heartbeat(name string) { - r.mu.Lock() - defer r.mu.Unlock() - - if agent, ok := r.agents[normalize(name)]; ok { - agent.LastSeen = time.Now() - } -}