Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions cmd/task/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package main
import (
"bufio"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/bborn/workflow/internal/executor"
"github.com/bborn/workflow/internal/github"
"github.com/bborn/workflow/internal/mcp"
"github.com/bborn/workflow/internal/relay"
"github.com/bborn/workflow/internal/ui"
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
Expand Down Expand Up @@ -61,6 +64,13 @@ func getDaemonSessionName() string {
return fmt.Sprintf("task-daemon-%s", getSessionID())
}

// generateShortID generates a short random ID for relay messages.
func generateShortID() string {
b := make([]byte, 4)
rand.Read(b)
return hex.EncodeToString(b)
}

func main() {
var dangerous bool

Expand Down Expand Up @@ -2034,6 +2044,225 @@ Examples:

rootCmd.AddCommand(eventsCmd)

// Relay subcommand - agent-to-agent messaging
relayCmd := &cobra.Command{
Use: "relay",
Short: "Agent-to-agent messaging",
Long: `Send and receive messages between running agent sessions.

Messages are routed by agent name (typically the task title). Agents must be
actively running to send or receive messages.

Examples:
ty relay send Bob "Can you review my changes?"
ty relay send * "Broadcast to all agents"
ty relay list # List connected agents
ty relay read abc123 # Read message by ID`,
}

// Relay send subcommand
relaySendCmd := &cobra.Command{
Use: "send <to> <message>",
Short: "Send a message to an agent",
Long: `Send a message to another running agent.

The <to> argument can be:
- An agent name (case-insensitive)
- * for broadcast to all agents

Examples:
ty relay send Bob "Hello!"
ty relay send bob "Can you review?" # case-insensitive
ty relay send * "Everyone check this"`,
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
to := args[0]
message := strings.Join(args[1:], " ")
fromTask, _ := cmd.Flags().GetInt64("from")

// Open database
dbPath := db.DefaultPath()
database, err := db.Open(dbPath)
if err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}
defer database.Close()

// Create relay store and send message
from := "cli"
if fromTask > 0 {
task, err := database.GetTask(fromTask)
if err == nil && task != nil {
from = task.Title
}
}

msg := &db.RelayMessage{
ID: generateShortID(),
From: from,
To: to,
Content: message,
TaskID: fromTask,
Status: "pending",
CreatedAt: db.LocalTime{Time: time.Now()},
}

if err := database.SaveRelayMessage(msg); err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}

fmt.Println(successStyle.Render(fmt.Sprintf("Message sent (id: %s)", msg.ID)))
},
}
relaySendCmd.Flags().Int64("from", 0, "Sender task ID (for attribution)")
relayCmd.AddCommand(relaySendCmd)

// Relay read subcommand
relayReadCmd := &cobra.Command{
Use: "read [message-id]",
Short: "Read relay messages",
Long: `Read a specific message by ID or list recent messages for an agent.

Examples:
ty relay read abc123 # Read specific message
ty relay read --agent Bob # List messages for Bob`,
Run: func(cmd *cobra.Command, args []string) {
agentName, _ := cmd.Flags().GetString("agent")
limit, _ := cmd.Flags().GetInt("limit")

// Open database
dbPath := db.DefaultPath()
database, err := db.Open(dbPath)
if err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}
defer database.Close()

// If message ID provided, show that specific message
if len(args) > 0 {
msg, err := database.GetRelayMessage(args[0])
if err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}
if msg == nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Message not found"))
os.Exit(1)
}

fmt.Printf("%s → %s\n", boldStyle.Render(msg.From), msg.To)
fmt.Printf("%s\n", dimStyle.Render(msg.CreatedAt.Format("2006-01-02 15:04:05")))
fmt.Printf("\n%s\n", msg.Content)

// Mark as read
database.MarkRelayMessageRead(msg.ID)
return
}

// List messages for agent
if agentName == "" {
fmt.Fprintln(os.Stderr, errorStyle.Render("Provide a message ID or --agent flag"))
os.Exit(1)
}

messages, err := database.GetRelayMessagesForAgent(agentName, limit)
if err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}

if len(messages) == 0 {
fmt.Println(dimStyle.Render("No messages"))
return
}

for _, msg := range messages {
statusIcon := ""
if msg.Status == "pending" {
statusIcon = " •"
}
preview := msg.Content
if len(preview) > 60 {
preview = preview[:60] + "..."
}
fmt.Printf("%s%s %s → %s: %s\n",
dimStyle.Render(msg.ID),
boldStyle.Render(statusIcon),
msg.From,
msg.To,
preview,
)
}
},
}
relayReadCmd.Flags().String("agent", "", "Filter messages for this agent")
relayReadCmd.Flags().Int("limit", 20, "Maximum messages to show")
relayCmd.AddCommand(relayReadCmd)

// Relay list subcommand - list connected agents
relayListCmd := &cobra.Command{
Use: "list",
Short: "List connected agents",
Long: `Show all agents currently registered with the relay.

Agents are registered when their tasks start executing and unregistered
when tasks complete.`,
Run: func(cmd *cobra.Command, args []string) {
// Open database
dbPath := db.DefaultPath()
database, err := db.Open(dbPath)
if err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}
defer database.Close()

// Get processing tasks as a proxy for connected agents
tasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusProcessing, Limit: 100})
if err != nil {
fmt.Fprintln(os.Stderr, errorStyle.Render("Error: "+err.Error()))
os.Exit(1)
}

// Also include blocked tasks (they're still "connected")
blockedTasks, err := database.ListTasks(db.ListTasksOptions{Status: db.StatusBlocked, Limit: 100})
if err != nil {
fmt.Fprintln(os.Stderr, dimStyle.Render("Warning: failed to list blocked tasks"))
} else {
tasks = append(tasks, blockedTasks...)
}

if len(tasks) == 0 {
fmt.Println(dimStyle.Render("No agents connected"))
return
}

fmt.Println(boldStyle.Render("Connected Agents"))
fmt.Println()
for _, task := range tasks {
// Use shared function to get agent name
name := relay.CleanAgentName(task.Title, fmt.Sprintf("task-%d", task.ID))

status := task.Status
if status == db.StatusProcessing {
status = "active"
}
fmt.Printf(" %s #%d %s (%s)\n",
boldStyle.Render(name),
task.ID,
task.Project,
dimStyle.Render(status),
)
}
},
}
relayCmd.AddCommand(relayListCmd)

rootCmd.AddCommand(relayCmd)

// Projects subcommand - manage projects
projectsCmd := &cobra.Command{
Use: "projects",
Expand Down
128 changes: 128 additions & 0 deletions internal/db/relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package db

import (
"database/sql"
"fmt"
"strings"
"time"
)

// RelayMessage represents a stored relay message.
type RelayMessage struct {
ID string
From string
To string
Content string
TaskID int64
Status string // pending, delivered, read
CreatedAt LocalTime
DeliveredAt *LocalTime
ReadAt *LocalTime
}

// SaveRelayMessage stores a relay message.
func (db *DB) SaveRelayMessage(msg *RelayMessage) error {
_, err := db.Exec(`
INSERT INTO relay_messages (id, from_agent, to_agent, content, task_id, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, msg.ID, msg.From, msg.To, msg.Content, msg.TaskID, msg.Status, msg.CreatedAt.Time)
if err != nil {
return fmt.Errorf("insert relay message: %w", err)
}
return nil
}

// GetRelayMessage retrieves a message by ID.
func (db *DB) GetRelayMessage(id string) (*RelayMessage, error) {
msg := &RelayMessage{}
err := db.QueryRow(`
SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status,
created_at, delivered_at, read_at
FROM relay_messages WHERE id = ?
`, id).Scan(
&msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status,
&msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("query relay message: %w", err)
}
return msg, nil
}

// GetRelayMessagesForAgent retrieves messages for an agent (direct or broadcast).
func (db *DB) GetRelayMessagesForAgent(agentName string, limit int) ([]*RelayMessage, error) {
normalized := strings.ToLower(strings.TrimSpace(agentName))
rows, err := db.Query(`
SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status,
created_at, delivered_at, read_at
FROM relay_messages
WHERE LOWER(to_agent) = ? OR to_agent = '*'
ORDER BY created_at DESC
LIMIT ?
`, normalized, limit)
if err != nil {
return nil, fmt.Errorf("query relay messages: %w", err)
}
defer rows.Close()

var messages []*RelayMessage
for rows.Next() {
msg := &RelayMessage{}
if err := rows.Scan(
&msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status,
&msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt,
); err != nil {
return nil, fmt.Errorf("scan relay message: %w", err)
}
messages = append(messages, msg)
}
return messages, nil
}

// GetPendingRelayMessages retrieves pending messages for an agent.
func (db *DB) GetPendingRelayMessages(agentName string) ([]*RelayMessage, error) {
normalized := strings.ToLower(strings.TrimSpace(agentName))
rows, err := db.Query(`
SELECT id, from_agent, to_agent, content, COALESCE(task_id, 0), status,
created_at, delivered_at, read_at
FROM relay_messages
WHERE (LOWER(to_agent) = ? OR to_agent = '*') AND status = 'pending'
ORDER BY created_at ASC
`, normalized)
if err != nil {
return nil, fmt.Errorf("query pending relay messages: %w", err)
}
defer rows.Close()

var messages []*RelayMessage
for rows.Next() {
msg := &RelayMessage{}
if err := rows.Scan(
&msg.ID, &msg.From, &msg.To, &msg.Content, &msg.TaskID, &msg.Status,
&msg.CreatedAt, &msg.DeliveredAt, &msg.ReadAt,
); err != nil {
return nil, fmt.Errorf("scan relay message: %w", err)
}
messages = append(messages, msg)
}
return messages, nil
}

// MarkRelayMessageDelivered marks a message as delivered.
func (db *DB) MarkRelayMessageDelivered(id string) error {
_, err := db.Exec(`
UPDATE relay_messages SET status = 'delivered', delivered_at = ? WHERE id = ?
`, time.Now(), id)
return err
}

// MarkRelayMessageRead marks a message as read.
func (db *DB) MarkRelayMessageRead(id string) error {
_, err := db.Exec(`
UPDATE relay_messages SET status = 'read', read_at = ? WHERE id = ?
`, time.Now(), id)
return err
}
Loading