diff --git a/cmd/task/main.go b/cmd/task/main.go index b65426d3..631b979d 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -4,6 +4,8 @@ package main import ( "bufio" "context" + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" "io" @@ -22,6 +24,7 @@ import ( "github.com/bborn/workflow/internal/db" "github.com/bborn/workflow/internal/executor" "github.com/bborn/workflow/internal/github" + "github.com/bborn/workflow/internal/relay" "github.com/bborn/workflow/internal/ui" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" @@ -60,6 +63,13 @@ func getDaemonSessionName() string { return fmt.Sprintf("task-daemon-%s", getSessionID()) } +// generateShortID generates a short random ID for relay messages. +func generateShortID() string { + b := make([]byte, 4) + rand.Read(b) + return hex.EncodeToString(b) +} + func main() { var dangerous bool @@ -1895,6 +1905,225 @@ Examples: rootCmd.AddCommand(eventsCmd) + // Relay subcommand - agent-to-agent messaging + relayCmd := &cobra.Command{ + Use: "relay", + Short: "Agent-to-agent messaging", + Long: `Send and receive messages between running agent sessions. + +Messages are routed by agent name (typically the task title). Agents must be +actively running to send or receive messages. + +Examples: + ty relay send Bob "Can you review my changes?" + ty relay send * "Broadcast to all agents" + ty relay list # List connected agents + ty relay read abc123 # Read message by ID`, + } + + // Relay send subcommand + relaySendCmd := &cobra.Command{ + Use: "send ", + Short: "Send a message to an agent", + Long: `Send a message to another running agent. + +The argument can be: + - An agent name (case-insensitive) + - * for broadcast to all agents + +Examples: + ty relay send Bob "Hello!" + ty relay send bob "Can you review?" # case-insensitive + ty relay send * "Everyone check this"`, + Args: cobra.MinimumNArgs(2), + Run: func(cmd *cobra.Command, args []string) { + to := args[0] + message := strings.Join(args[1:], " ") + fromTask, _ := cmd.Flags().GetInt64("from") + + // Open database + dbPath := db.DefaultPath() + database, err := db.Open(dbPath) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + defer database.Close() + + // Create relay store and send message + from := "cli" + if fromTask > 0 { + task, err := database.GetTask(fromTask) + if err == nil && task != nil { + from = task.Title + } + } + + msg := &db.RelayMessage{ + ID: generateShortID(), + From: from, + To: to, + Content: message, + TaskID: fromTask, + Status: "pending", + CreatedAt: db.LocalTime{Time: time.Now()}, + } + + if err := database.SaveRelayMessage(msg); err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + + fmt.Println(successStyle.Render(fmt.Sprintf("Message sent (id: %s)", msg.ID))) + }, + } + relaySendCmd.Flags().Int64("from", 0, "Sender task ID (for attribution)") + relayCmd.AddCommand(relaySendCmd) + + // Relay read subcommand + relayReadCmd := &cobra.Command{ + Use: "read [message-id]", + Short: "Read relay messages", + Long: `Read a specific message by ID or list recent messages for an agent. + +Examples: + ty relay read abc123 # Read specific message + ty relay read --agent Bob # List messages for Bob`, + Run: func(cmd *cobra.Command, args []string) { + agentName, _ := cmd.Flags().GetString("agent") + limit, _ := cmd.Flags().GetInt("limit") + + // Open database + dbPath := db.DefaultPath() + database, err := db.Open(dbPath) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + defer database.Close() + + // If message ID provided, show that specific message + if len(args) > 0 { + msg, err := database.GetRelayMessage(args[0]) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + if msg == nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Message not found")) + os.Exit(1) + } + + fmt.Printf("%s → %s\n", boldStyle.Render(msg.From), msg.To) + fmt.Printf("%s\n", dimStyle.Render(msg.CreatedAt.Format("2006-01-02 15:04:05"))) + fmt.Printf("\n%s\n", msg.Content) + + // Mark as read + database.MarkRelayMessageRead(msg.ID) + return + } + + // List messages for agent + if agentName == "" { + fmt.Fprintln(os.Stderr, errorStyle.Render("Provide a message ID or --agent flag")) + os.Exit(1) + } + + messages, err := database.GetRelayMessagesForAgent(agentName, limit) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + + if len(messages) == 0 { + fmt.Println(dimStyle.Render("No messages")) + return + } + + for _, msg := range messages { + statusIcon := "" + if msg.Status == "pending" { + statusIcon = " •" + } + preview := msg.Content + if len(preview) > 60 { + preview = preview[:60] + "..." + } + fmt.Printf("%s%s %s → %s: %s\n", + dimStyle.Render(msg.ID), + boldStyle.Render(statusIcon), + msg.From, + msg.To, + preview, + ) + } + }, + } + relayReadCmd.Flags().String("agent", "", "Filter messages for this agent") + relayReadCmd.Flags().Int("limit", 20, "Maximum messages to show") + relayCmd.AddCommand(relayReadCmd) + + // Relay list subcommand - list connected agents + relayListCmd := &cobra.Command{ + Use: "list", + Short: "List connected agents", + Long: `Show all agents currently registered with the relay. + +Agents are registered when their tasks start executing and unregistered +when tasks complete.`, + Run: func(cmd *cobra.Command, args []string) { + // Open database + dbPath := db.DefaultPath() + database, err := db.Open(dbPath) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + defer database.Close() + + // Get processing tasks as a proxy for connected agents + tasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusProcessing, Limit: 100}) + if err != nil { + fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error())) + os.Exit(1) + } + + // Also include blocked tasks (they're still "connected") + blockedTasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100}) + if err != nil { + fmt.Fprintln(os.Stderr, dimStyle.Render("Warning: failed to list blocked tasks")) + } else { + tasks = append(tasks, blockedTasks...) + } + + if len(tasks) == 0 { + fmt.Println(dimStyle.Render("No agents connected")) + return + } + + fmt.Println(boldStyle.Render("Connected Agents")) + fmt.Println() + for _, task := range tasks { + // Use shared function to get agent name + name := relay.CleanAgentName(task.Title, fmt.Sprintf("task-%d", task.ID)) + + status := task.Status + if status == db.StatusProcessing { + status = "active" + } + fmt.Printf(" %s #%d %s (%s)\n", + boldStyle.Render(name), + task.ID, + task.Project, + dimStyle.Render(status), + ) + } + }, + } + relayCmd.AddCommand(relayListCmd) + + rootCmd.AddCommand(relayCmd) + // Projects subcommand - manage projects projectsCmd := &cobra.Command{ Use: "projects", diff --git a/internal/db/relay.go b/internal/db/relay.go new file mode 100644 index 00000000..dce39bee --- /dev/null +++ b/internal/db/relay.go @@ -0,0 +1,128 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" +) + +// RelayMessage represents a stored relay message. +type RelayMessage struct { + ID string + From string + To string + Content string + TaskID int64 + Status string // pending, delivered, read + CreatedAt LocalTime + DeliveredAt *LocalTime + ReadAt *LocalTime +} + +// SaveRelayMessage stores a relay message. +func (db *DB) SaveRelayMessage(msg *RelayMessage) error { + _, err := db.Exec(` + INSERT INTO relay_messages (id, from_agent, to_agent, content, task_id, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, msg.ID, msg.From, msg.To, msg.Content, msg.TaskID, msg.Status, msg.CreatedAt.Time) + if err != nil { + return fmt.Errorf("insert relay message: %w", err) + } + return nil +} + +// GetRelayMessage retrieves a message by ID. +func (db *DB) GetRelayMessage(id string) (*RelayMessage, error) { + msg := &RelayMessage{} + err := db.QueryRow(` + SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status, + created_at, delivered_at, read_at + FROM relay_messages WHERE id = ? + `, id).Scan( + &msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status, + &msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("query relay message: %w", err) + } + return msg, nil +} + +// GetRelayMessagesForAgent retrieves messages for an agent (direct or broadcast). +func (db *DB) GetRelayMessagesForAgent(agentName string, limit int) ([]*RelayMessage, error) { + normalized := strings.ToLower(strings.TrimSpace(agentName)) + rows, err := db.Query(` + SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status, + created_at, delivered_at, read_at + FROM relay_messages + WHERE LOWER(to_agent) = ? OR to_agent = '*' + ORDER BY created_at DESC + LIMIT ? + `, normalized, limit) + if err != nil { + return nil, fmt.Errorf("query relay messages: %w", err) + } + defer rows.Close() + + var messages []*RelayMessage + for rows.Next() { + msg := &RelayMessage{} + if err := rows.Scan( + &msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status, + &msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt, + ); err != nil { + return nil, fmt.Errorf("scan relay message: %w", err) + } + messages = append(messages, msg) + } + return messages, nil +} + +// GetPendingRelayMessages retrieves pending messages for an agent. +func (db *DB) GetPendingRelayMessages(agentName string) ([]*RelayMessage, error) { + normalized := strings.ToLower(strings.TrimSpace(agentName)) + rows, err := db.Query(` + SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status, + created_at, delivered_at, read_at + FROM relay_messages + WHERE (LOWER(to_agent) = ? OR to_agent = '*') AND status = 'pending' + ORDER BY created_at ASC + `, normalized) + if err != nil { + return nil, fmt.Errorf("query pending relay messages: %w", err) + } + defer rows.Close() + + var messages []*RelayMessage + for rows.Next() { + msg := &RelayMessage{} + if err := rows.Scan( + &msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status, + &msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt, + ); err != nil { + return nil, fmt.Errorf("scan relay message: %w", err) + } + messages = append(messages, msg) + } + return messages, nil +} + +// MarkRelayMessageDelivered marks a message as delivered. +func (db *DB) MarkRelayMessageDelivered(id string) error { + _, err := db.Exec(` + UPDATE relay_messages SET status = 'delivered', delivered_at = ? WHERE id = ? + `, time.Now(), id) + return err +} + +// MarkRelayMessageRead marks a message as read. +func (db *DB) MarkRelayMessageRead(id string) error { + _, err := db.Exec(` + UPDATE relay_messages SET status = 'read', read_at = ? WHERE id = ? + `, time.Now(), id) + return err +} diff --git a/internal/db/sqlite.go b/internal/db/sqlite.go index 07bd75d4..bd95ed28 100644 --- a/internal/db/sqlite.go +++ b/internal/db/sqlite.go @@ -180,6 +180,20 @@ func (db *DB) migrate() error { `CREATE INDEX IF NOT EXISTS idx_event_log_event_type ON event_log(event_type)`, `CREATE INDEX IF NOT EXISTS idx_event_log_created_at ON event_log(created_at)`, + // Relay messages for agent-to-agent communication + `CREATE TABLE IF NOT EXISTS relay_messages ( + id TEXT PRIMARY KEY, + from_agent TEXT NOT NULL, + to_agent TEXT NOT NULL, + content TEXT NOT NULL, + task_id INTEGER, + status TEXT DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + delivered_at DATETIME, + read_at DATETIME + )`, + `CREATE INDEX IF NOT EXISTS idx_relay_messages_to ON relay_messages(to_agent)`, + `CREATE INDEX IF NOT EXISTS idx_relay_messages_status ON relay_messages(status)`, } for _, m := range migrations { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index d107b0b9..bf316b3e 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -67,6 +67,9 @@ type Executor struct { executorSlug string executorName string + + // Relay manager for agent-to-agent messaging + relay *RelayManager } // DefaultSuspendIdleTimeout is the default time a blocked task must be idle before being suspended. @@ -161,6 +164,9 @@ func New(database *db.DB, cfg *config.Config) *Executor { e.executorFactory.Register(NewOpenCodeExecutor(e)) e.executorFactory.Register(NewPiExecutor(e)) + // Initialize relay manager for agent-to-agent messaging + e.relay = NewRelayManager(e) + return e } @@ -198,6 +204,9 @@ func NewWithLogging(database *db.DB, cfg *config.Config, w io.Writer) *Executor e.executorFactory.Register(NewOpenCodeExecutor(e)) e.executorFactory.Register(NewPiExecutor(e)) + // Initialize relay manager for agent-to-agent messaging + e.relay = NewRelayManager(e) + return e } @@ -217,6 +226,11 @@ func (e *Executor) ExecutorSlug() string { return e.executorSlug } +// Relay returns the relay manager for agent-to-agent messaging. +func (e *Executor) Relay() *RelayManager { + return e.relay +} + // Start begins the background worker. func (e *Executor) Start(ctx context.Context) { e.mu.Lock() @@ -695,6 +709,11 @@ func (e *Executor) worker(ctx context.Context) { case <-ticker.C: e.processNextTask(ctx) + // Deliver pending relay messages to idle agents + if e.relay != nil { + e.relay.DeliverPendingMessages(ctx) + } + tickCount++ // Periodically check for merged branches @@ -846,6 +865,12 @@ func (e *Executor) executeTask(ctx context.Context, task *db.Task) { delete(e.runningTasks, task.ID) delete(e.cancelFuncs, task.ID) e.mu.Unlock() + + // Unregister relay agent when task finishes + if e.relay != nil { + e.relay.UnregisterAgent(task.ID) + e.relay.ClearActivity(task.ID) + } }() e.logger.Info("Processing task", "id", task.ID, "title", task.Title) @@ -862,6 +887,11 @@ func (e *Executor) executeTask(ctx context.Context, task *db.Task) { // Log start and trigger hook startMsg := fmt.Sprintf("Starting task #%d: %s", task.ID, task.Title) e.logLine(task.ID, "system", startMsg) + + // Register task as relay agent + if e.relay != nil { + e.relay.RegisterAgent(task) + } e.hooks.OnStatusChange(task, db.StatusProcessing, startMsg) // Setup worktree for isolated execution (symlinks claude config from project) @@ -3013,6 +3043,11 @@ func (e *Executor) logLine(taskID int64, lineType, content string) { // Store in database e.db.AppendTaskLog(taskID, lineType, content) + // Record activity for relay idle detection only for actual output lines + if e.relay != nil && lineType == "output" { + e.relay.RecordActivity(taskID) + } + // Broadcast to subscribers logEntry := &db.TaskLog{ TaskID: taskID, diff --git a/internal/executor/relay.go b/internal/executor/relay.go new file mode 100644 index 00000000..0572a6ea --- /dev/null +++ b/internal/executor/relay.go @@ -0,0 +1,182 @@ +package executor + +import ( + "context" + "fmt" + "os/exec" + "sync" + "time" + + "github.com/bborn/workflow/internal/db" + "github.com/bborn/workflow/internal/relay" +) + +// RelayManager handles agent-to-agent messaging for the executor. +type RelayManager struct { + mu sync.RWMutex + relay *relay.Relay + executor *Executor + store *relay.DBStore + + // Track last activity per task for idle detection + lastActivity map[int64]time.Time +} + +// NewRelayManager creates a relay manager for the executor. +func NewRelayManager(e *Executor) *RelayManager { + store := relay.NewDBStore(e.db) + return &RelayManager{ + relay: relay.New(store), + executor: e, + store: store, + lastActivity: make(map[int64]time.Time), + } +} + +// RegisterAgent registers a task as an agent. +// Uses task title as the agent name. +func (rm *RelayManager) RegisterAgent(task *db.Task) { + name := agentName(task) + rm.relay.Register(name, task.ID) + rm.executor.logger.Info("Registered relay agent", "name", name, "task", task.ID) +} + +// UnregisterAgent removes a task from the agent registry. +func (rm *RelayManager) UnregisterAgent(taskID int64) { + task, _ := rm.executor.db.GetTask(taskID) + if task != nil { + name := agentName(task) + rm.relay.Unregister(name) + rm.executor.logger.Info("Unregistered relay agent", "name", name, "task", task.ID) + } +} + +// agentName derives agent name from task using the shared relay.CleanAgentName function. +func agentName(task *db.Task) string { + fallback := fmt.Sprintf("task-%d", task.ID) + return relay.CleanAgentName(task.Title, fallback) +} + +// Send sends a message from one agent to another. +func (rm *RelayManager) Send(fromTaskID int64, to, content string) (string, error) { + task, err := rm.executor.db.GetTask(fromTaskID) + if err != nil || task == nil { + return "", fmt.Errorf("sender task not found") + } + + from := agentName(task) + msgID, err := rm.relay.Send(from, to, content, fromTaskID) + if err != nil { + return "", err + } + + rm.executor.logLine(fromTaskID, "relay", fmt.Sprintf("Sent to %s: %s", to, truncate(content, 100))) + return msgID, nil +} + +// SendFromCLI sends a message from the CLI (not from a task). +func (rm *RelayManager) SendFromCLI(from, to, content string) (string, error) { + return rm.relay.Send(from, to, content, 0) +} + +// RecordActivity records that a task had output activity. +func (rm *RelayManager) RecordActivity(taskID int64) { + rm.mu.Lock() + rm.lastActivity[taskID] = time.Now() + rm.mu.Unlock() +} + +// ClearActivity removes activity tracking for a task (for cleanup). +func (rm *RelayManager) ClearActivity(taskID int64) { + rm.mu.Lock() + delete(rm.lastActivity, taskID) + rm.mu.Unlock() +} + +// IsIdle checks if a task has been idle for the given duration. +func (rm *RelayManager) IsIdle(taskID int64, idleDuration time.Duration) bool { + rm.mu.RLock() + lastActive, ok := rm.lastActivity[taskID] + rm.mu.RUnlock() + + if !ok { + return true // No activity recorded means idle + } + return time.Since(lastActive) >= idleDuration +} + +// DeliverPendingMessages checks for pending messages and delivers them to idle agents. +func (rm *RelayManager) DeliverPendingMessages(ctx context.Context) { + agents := rm.relay.ListAgents() + + for _, agent := range agents { + // Check if agent's task is idle (no output for 1.5 seconds) + if !rm.IsIdle(agent.TaskID, 1500*time.Millisecond) { + continue + } + + // Get pending messages + messages := rm.relay.GetPendingMessages(agent.Name) + if len(messages) == 0 { + continue + } + + // Deliver each message + for _, msg := range messages { + if err := rm.injectMessage(ctx, agent.TaskID, msg); err != nil { + rm.executor.logger.Error("Failed to inject relay message", "error", err, "task", agent.TaskID) + continue + } + rm.relay.MarkDelivered(msg.ID) + rm.executor.logLine(agent.TaskID, "relay", fmt.Sprintf("Received from %s: %s", msg.From, truncate(msg.Content, 100))) + } + } +} + +// injectMessage injects a relay message into the task's tmux pane. +func (rm *RelayManager) injectMessage(ctx context.Context, taskID int64, msg *relay.Message) error { + task, err := rm.executor.db.GetTask(taskID) + if err != nil || task == nil { + return fmt.Errorf("task not found") + } + + // Get the Claude pane ID + paneID := task.ClaudePaneID + if paneID == "" { + return fmt.Errorf("no pane ID for task %d", taskID) + } + + // Format the message for injection + formatted := msg.FormatForInjection() + + // Use tmux send-keys to inject the message + cmd := exec.CommandContext(ctx, "tmux", "send-keys", "-t", paneID, formatted, "") + return cmd.Run() +} + +// GetMessage retrieves a message by ID. +func (rm *RelayManager) GetMessage(id string) *relay.Message { + return rm.relay.GetMessage(id) +} + +// ListAgents returns all registered agents. +func (rm *RelayManager) ListAgents() []*relay.Agent { + return rm.relay.ListAgents() +} + +// GetAgentByName returns an agent by name. +func (rm *RelayManager) GetAgentByName(name string) *relay.Agent { + return rm.relay.GetAgent(name) +} + +// GetMessagesForAgent retrieves messages for an agent. +func (rm *RelayManager) GetMessagesForAgent(agentName string, limit int) ([]*relay.Message, error) { + return rm.store.GetMessagesForAgent(agentName, limit) +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + "..." +} diff --git a/internal/relay/relay.go b/internal/relay/relay.go new file mode 100644 index 00000000..6d367dd6 --- /dev/null +++ b/internal/relay/relay.go @@ -0,0 +1,288 @@ +// Package relay provides simple agent-to-agent messaging. +package relay + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/google/uuid" +) + +// Message represents a relay message. +type Message struct { + ID string `json:"id"` + From string `json:"from"` + To string `json:"to"` // agent name, #channel, or * for broadcast + Content string `json:"content"` + TaskID int64 `json:"task_id"` // sender's task + Status string `json:"status"` // pending, delivered, read + CreatedAt time.Time `json:"created_at"` + ReadAt *time.Time `json:"read_at,omitempty"` +} + +// Agent represents a connected agent. +type Agent struct { + Name string `json:"name"` + TaskID int64 `json:"task_id"` + Status string `json:"status"` // active, idle + LastSeen time.Time `json:"last_seen"` +} + +// Relay manages agent messaging. +type Relay struct { + mu sync.RWMutex + agents map[string]*Agent // normalized name -> agent + messages []*Message // in-memory queue (also persisted to DB) + store MessageStore // persistence layer +} + +// MessageStore is the interface for message persistence. +type MessageStore interface { + SaveMessage(msg *Message) error + GetMessage(id string) (*Message, error) + GetMessagesForAgent(agentName string, limit int) ([]*Message, error) + MarkDelivered(id string) error + MarkRead(id string) error +} + +// New creates a new Relay. +func New(store MessageStore) *Relay { + return &Relay{ + agents: make(map[string]*Agent), + store: store, + } +} + +// Register registers an agent. +func (r *Relay) Register(name string, taskID int64) *Agent { + r.mu.Lock() + defer r.mu.Unlock() + + key := normalize(name) + agent := &Agent{ + Name: name, + TaskID: taskID, + Status: "active", + LastSeen: time.Now(), + } + r.agents[key] = agent + return agent +} + +// Unregister removes an agent. +func (r *Relay) Unregister(name string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.agents, normalize(name)) +} + +// UpdateStatus updates an agent's status. +func (r *Relay) UpdateStatus(name, status string) { + r.mu.Lock() + defer r.mu.Unlock() + + if agent, ok := r.agents[normalize(name)]; ok { + agent.Status = status + agent.LastSeen = time.Now() + } +} + +// GetAgent returns an agent by name. +func (r *Relay) GetAgent(name string) *Agent { + r.mu.RLock() + defer r.mu.RUnlock() + return r.agents[normalize(name)] +} + +// ListAgents returns all registered agents. +func (r *Relay) ListAgents() []*Agent { + r.mu.RLock() + defer r.mu.RUnlock() + + agents := make([]*Agent, 0, len(r.agents)) + for _, a := range r.agents { + agents = append(agents, a) + } + return agents +} + +// Send sends a message. Returns the message ID. +func (r *Relay) Send(from, to, content string, fromTaskID int64) (string, error) { + msg := &Message{ + ID: uuid.New().String()[:8], // short ID for easy reference + From: from, + To: to, + Content: content, + TaskID: fromTaskID, + Status: "pending", + CreatedAt: time.Now(), + } + + if r.store != nil { + if err := r.store.SaveMessage(msg); err != nil { + return "", fmt.Errorf("save message: %w", err) + } + } + + r.mu.Lock() + r.messages = append(r.messages, msg) + r.mu.Unlock() + + return msg.ID, nil +} + +// GetPendingMessages returns pending messages for an agent. +func (r *Relay) GetPendingMessages(agentName string) []*Message { + r.mu.RLock() + defer r.mu.RUnlock() + + key := normalize(agentName) + var pending []*Message + + for _, msg := range r.messages { + if msg.Status != "pending" { + continue + } + // Match direct, broadcast, or channel + if msg.To == "*" || normalize(msg.To) == key { + pending = append(pending, msg) + } + } + return pending +} + +// MarkDelivered marks a message as delivered. +func (r *Relay) MarkDelivered(msgID string) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, msg := range r.messages { + if msg.ID == msgID { + msg.Status = "delivered" + if r.store != nil { + r.store.MarkDelivered(msgID) + } + break + } + } +} + +// GetMessage retrieves a message by ID. +func (r *Relay) GetMessage(id string) *Message { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, msg := range r.messages { + if msg.ID == id { + return msg + } + } + + // Try store if not in memory + if r.store != nil { + msg, _ := r.store.GetMessage(id) + return msg + } + return nil +} + +// ToJSON converts a message to JSON for logging. +func (m *Message) ToJSON() string { + b, _ := json.Marshal(m) + return string(b) +} + +// FromJSON parses a message from JSON. +func FromJSON(data string) (*Message, error) { + var m Message + if err := json.Unmarshal([]byte(data), &m); err != nil { + return nil, err + } + return &m, nil +} + +// FormatForInjection formats a message for terminal injection. +func (m *Message) FormatForInjection() string { + content := m.Content + truncated := "" + if len(content) > 500 { + content = content[:500] + "..." + truncated = fmt.Sprintf(" (truncated, full: ty relay read %s)", m.ID) + } + return fmt.Sprintf("\n[RELAY from %s%s]\n%s\n[/RELAY]\n", m.From, truncated, content) +} + +// ParseRelayCommand parses "->relay:target message" format. +// Returns (target, message, ok). +func ParseRelayCommand(input string) (string, string, bool) { + input = strings.TrimSpace(input) + if !strings.HasPrefix(input, "->relay:") { + return "", "", false + } + + rest := strings.TrimPrefix(input, "->relay:") + parts := strings.SplitN(rest, " ", 2) + if len(parts) < 2 { + return "", "", false + } + + target := strings.TrimSpace(parts[0]) + message := strings.TrimSpace(parts[1]) + return target, message, target != "" && message != "" +} + +func normalize(name string) string { + return strings.ToLower(strings.TrimSpace(name)) +} + +// CleanAgentName cleans a raw name (like task title) for use as an agent name. +// It removes special characters, replaces spaces with hyphens, limits length, +// and normalizes to lowercase. +func CleanAgentName(rawName string, fallback string) string { + name := rawName + if name == "" { + name = fallback + } + // Clean up for use as agent name (keep alphanumeric, hyphen, underscore) + name = strings.Map(func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' { + return r + } + if r == ' ' { + return '-' + } + return -1 + }, name) + // Limit length + if len(name) > 32 { + name = name[:32] + } + // Normalize to lowercase for consistent matching + return strings.ToLower(strings.TrimSpace(name)) +} + +// GetAgentByTaskID finds an agent by their task ID. +func (r *Relay) GetAgentByTaskID(taskID int64) *Agent { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, a := range r.agents { + if a.TaskID == taskID { + return a + } + } + return nil +} + +// Heartbeat updates an agent's last seen time. +func (r *Relay) Heartbeat(name string) { + r.mu.Lock() + defer r.mu.Unlock() + + if agent, ok := r.agents[normalize(name)]; ok { + agent.LastSeen = time.Now() + } +} diff --git a/internal/relay/relay_test.go b/internal/relay/relay_test.go new file mode 100644 index 00000000..a4e34660 --- /dev/null +++ b/internal/relay/relay_test.go @@ -0,0 +1,187 @@ +package relay + +import ( + "strings" + "testing" +) + +func TestRelay_RegisterAgent(t *testing.T) { + r := New(nil) + + // Register an agent + agent := r.Register("TestAgent", 123) + + if agent.Name != "TestAgent" { + t.Errorf("Expected name 'TestAgent', got '%s'", agent.Name) + } + if agent.TaskID != 123 { + t.Errorf("Expected taskID 123, got %d", agent.TaskID) + } + if agent.Status != "active" { + t.Errorf("Expected status 'active', got '%s'", agent.Status) + } + + // Retrieve by name (case-insensitive) + found := r.GetAgent("testagent") + if found == nil { + t.Error("Expected to find agent with lowercase name") + } + if found != nil && found.Name != "TestAgent" { + t.Errorf("Expected original name 'TestAgent', got '%s'", found.Name) + } +} + +func TestRelay_ListAgents(t *testing.T) { + r := New(nil) + + r.Register("Agent1", 1) + r.Register("Agent2", 2) + + agents := r.ListAgents() + if len(agents) != 2 { + t.Errorf("Expected 2 agents, got %d", len(agents)) + } +} + +func TestRelay_Unregister(t *testing.T) { + r := New(nil) + + r.Register("TestAgent", 123) + r.Unregister("TestAgent") + + found := r.GetAgent("TestAgent") + if found != nil { + t.Error("Expected agent to be unregistered") + } +} + +func TestRelay_SendAndReceive(t *testing.T) { + r := New(nil) + + // Register an agent + r.Register("Bob", 1) + + // Send a message to Bob + msgID, err := r.Send("Alice", "Bob", "Hello Bob!", 0) + if err != nil { + t.Fatalf("Failed to send message: %v", err) + } + if msgID == "" { + t.Error("Expected non-empty message ID") + } + + // Check pending messages for Bob + pending := r.GetPendingMessages("Bob") + if len(pending) != 1 { + t.Errorf("Expected 1 pending message, got %d", len(pending)) + } + + if len(pending) > 0 { + if pending[0].From != "Alice" { + t.Errorf("Expected from 'Alice', got '%s'", pending[0].From) + } + if pending[0].Content != "Hello Bob!" { + t.Errorf("Expected content 'Hello Bob!', got '%s'", pending[0].Content) + } + } + + // Mark as delivered + r.MarkDelivered(msgID) + + // Should no longer be pending + pending = r.GetPendingMessages("Bob") + if len(pending) != 0 { + t.Errorf("Expected 0 pending messages after delivery, got %d", len(pending)) + } +} + +func TestRelay_BroadcastMessage(t *testing.T) { + r := New(nil) + + // Register multiple agents + r.Register("Agent1", 1) + r.Register("Agent2", 2) + + // Send broadcast + _, err := r.Send("Coordinator", "*", "Attention all agents!", 0) + if err != nil { + t.Fatalf("Failed to send broadcast: %v", err) + } + + // Both agents should receive the message + pending1 := r.GetPendingMessages("Agent1") + pending2 := r.GetPendingMessages("Agent2") + + if len(pending1) != 1 { + t.Errorf("Agent1 expected 1 pending message, got %d", len(pending1)) + } + if len(pending2) != 1 { + t.Errorf("Agent2 expected 1 pending message, got %d", len(pending2)) + } +} + +func TestRelay_CaseInsensitiveNames(t *testing.T) { + r := New(nil) + + r.Register("TestAgent", 1) + + // Should find with different cases + if r.GetAgent("testagent") == nil { + t.Error("Should find agent with lowercase") + } + if r.GetAgent("TESTAGENT") == nil { + t.Error("Should find agent with uppercase") + } + if r.GetAgent("TestAgent") == nil { + t.Error("Should find agent with exact case") + } +} + +func TestParseRelayCommand(t *testing.T) { + tests := []struct { + input string + wantTarget string + wantMessage string + wantOk bool + }{ + {"->relay:Bob Hello!", "Bob", "Hello!", true}, + {"->relay:Alice Can you help?", "Alice", "Can you help?", true}, + {"->relay:* Broadcast message", "*", "Broadcast message", true}, + {"->relay:Agent-1 Test", "Agent-1", "Test", true}, + {"not a relay command", "", "", false}, + {"->relay:", "", "", false}, + {"->relay:Bob", "", "", false}, // No message + } + + for _, tt := range tests { + target, message, ok := ParseRelayCommand(tt.input) + if ok != tt.wantOk { + t.Errorf("ParseRelayCommand(%q): ok = %v, want %v", tt.input, ok, tt.wantOk) + } + if target != tt.wantTarget { + t.Errorf("ParseRelayCommand(%q): target = %q, want %q", tt.input, target, tt.wantTarget) + } + if message != tt.wantMessage { + t.Errorf("ParseRelayCommand(%q): message = %q, want %q", tt.input, message, tt.wantMessage) + } + } +} + +func TestMessage_FormatForInjection(t *testing.T) { + msg := &Message{ + ID: "abc123", + From: "Alice", + Content: "Hello!", + } + + formatted := msg.FormatForInjection() + if formatted == "" { + t.Error("Expected non-empty formatted message") + } + if !strings.Contains(formatted, "[RELAY from Alice]") { + t.Error("Expected formatted message to contain sender") + } + if !strings.Contains(formatted, "Hello!") { + t.Error("Expected formatted message to contain content") + } +} diff --git a/internal/relay/store.go b/internal/relay/store.go new file mode 100644 index 00000000..05642d96 --- /dev/null +++ b/internal/relay/store.go @@ -0,0 +1,79 @@ +package relay + +import ( + "github.com/bborn/workflow/internal/db" +) + +// DBStore adapts db.DB to MessageStore interface. +type DBStore struct { + db *db.DB +} + +// NewDBStore creates a new database-backed message store. +func NewDBStore(database *db.DB) *DBStore { + return &DBStore{db: database} +} + +// SaveMessage persists a message to the database. +func (s *DBStore) SaveMessage(msg *Message) error { + dbMsg := &db.RelayMessage{ + ID: msg.ID, + From: msg.From, + To: msg.To, + Content: msg.Content, + TaskID: msg.TaskID, + Status: msg.Status, + CreatedAt: db.LocalTime{Time: msg.CreatedAt}, + } + return s.db.SaveRelayMessage(dbMsg) +} + +// GetMessage retrieves a message by ID. +func (s *DBStore) GetMessage(id string) (*Message, error) { + dbMsg, err := s.db.GetRelayMessage(id) + if err != nil || dbMsg == nil { + return nil, err + } + return dbToMessage(dbMsg), nil +} + +// GetMessagesForAgent retrieves messages for an agent. +func (s *DBStore) GetMessagesForAgent(agentName string, limit int) ([]*Message, error) { + dbMsgs, err := s.db.GetRelayMessagesForAgent(agentName, limit) + if err != nil { + return nil, err + } + + messages := make([]*Message, len(dbMsgs)) + for i, m := range dbMsgs { + messages[i] = dbToMessage(m) + } + return messages, nil +} + +// MarkDelivered marks a message as delivered. +func (s *DBStore) MarkDelivered(id string) error { + return s.db.MarkRelayMessageDelivered(id) +} + +// MarkRead marks a message as read. +func (s *DBStore) MarkRead(id string) error { + return s.db.MarkRelayMessageRead(id) +} + +func dbToMessage(m *db.RelayMessage) *Message { + msg := &Message{ + ID: m.ID, + From: m.From, + To: m.To, + Content: m.Content, + TaskID: m.TaskID, + Status: m.Status, + CreatedAt: m.CreatedAt.Time, + } + if m.ReadAt != nil { + t := m.ReadAt.Time + msg.ReadAt = &t + } + return msg +} diff --git a/internal/ui/detail.go b/internal/ui/detail.go index 6b58c8e4..38e1aa1d 100644 --- a/internal/ui/detail.go +++ b/internal/ui/detail.go @@ -2393,6 +2393,8 @@ func (m *DetailModel) renderContent() string { icon = "👤" case "output": icon = "📤" + case "relay": + icon = "📨" } var line string