From bd8ba06291311c141ab851bfb185496cf8bc5f6e Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Sat, 3 May 2025 22:19:55 -0500 Subject: [PATCH 1/8] Initial Refactor for DevMode to use new Bridge API and new TUI --- cmd/dev.go | 229 +++++++---- go.mod | 8 +- go.sum | 8 +- internal/dev/dev.go | 14 +- internal/dev/pending_logger.go | 145 +++++++ internal/dev/request.go | 212 +++++++++++ internal/dev/server.go | 333 ++++++++++++++++ internal/dev/tui.go | 487 ++++++++++++++++++++++++ internal/dev/tui_logger.go | 132 +++++++ internal/dev/websocket.go | 670 --------------------------------- 10 files changed, 1473 insertions(+), 765 deletions(-) create mode 100644 internal/dev/pending_logger.go create mode 100644 internal/dev/request.go create mode 100644 internal/dev/server.go create mode 100644 internal/dev/tui.go create mode 100644 internal/dev/tui_logger.go delete mode 100644 internal/dev/websocket.go diff --git a/cmd/dev.go b/cmd/dev.go index 8f586029..2a6f7f91 100644 --- a/cmd/dev.go +++ b/cmd/dev.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/signal" + "path/filepath" "runtime" "syscall" "time" @@ -14,12 +15,13 @@ import ( "github.com/agentuity/cli/internal/errsystem" "github.com/agentuity/cli/internal/project" "github.com/agentuity/cli/internal/util" + "github.com/agentuity/go-common/bridge" "github.com/agentuity/go-common/env" - cstr "github.com/agentuity/go-common/string" "github.com/agentuity/go-common/tui" "github.com/bep/debounce" "github.com/spf13/cobra" "github.com/spf13/viper" + "golang.org/x/term" ) var devCmd = &cobra.Command{ @@ -40,10 +42,16 @@ Examples: agentuity dev agentuity dev --dir /path/to/project`, Run: func(cmd *cobra.Command, args []string) { + fd := int(os.Stdin.Fd()) + oldState, err := term.GetState(fd) + if err != nil { + panic(err) + } + defer term.Restore(fd, oldState) + log := env.NewLogger(cmd) - _, appUrl, _ := util.GetURLs(log) - websocketUrl := viper.GetString("overrides.websocket_url") - websocketId, _ := cmd.Flags().GetString("websocket-id") + logLevel := env.LogLevel(cmd) + apiUrl, appUrl, transportUrl := util.GetURLs(log) signals := []os.Signal{os.Interrupt, syscall.SIGINT} if runtime.GOOS != "windows" { @@ -73,12 +81,29 @@ Examples: errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithUserMessage("Failed to validate project (%s) using the provided API key from the .env file in %s. This is most likely due to the API key being invalid or the project has been deleted.\n\nYou can import this project using the following command:\n\n"+tui.Command("project import"), theproject.Project.ProjectId, dir), errsystem.WithContextMessage(fmt.Sprintf("Failed to get project: %s", err))).ShowErrorAndExit() } - orgId := project.OrgId + projectToken := os.Getenv("AGENTUITY_API_KEY") + if projectToken == "" { + envFile := filepath.Join(dir, ".env") + if util.Exists(envFile) { + envs, err := env.ParseEnvFile(envFile) + if err != nil { + log.Fatal("failed to parse .env file: %s", err) + } + for _, kv := range envs { + if kv.Key == "AGENTUITY_API_KEY" { + projectToken = kv.Val + break + } + } + } + } - if websocketId == "" { - websocketId = cstr.NewHash(orgId, userId) + if projectToken == "" { + log.Fatal("failed to find AGENTUITY_API_KEY in .env file or system environment variable") } + orgId := project.OrgId + port, _ := cmd.Flags().GetInt("port") if port == 0 { port, err = dev.FindAvailablePort(theproject) @@ -87,25 +112,105 @@ Examples: } } - websocketConn, err := dev.NewWebsocket(dev.WebsocketArgs{ + var connection *bridge.BridgeConnectionInfo + + settings := viper.Get("devmode." + orgId) + if val, ok := settings.(map[string]any); ok { + connection = &bridge.BridgeConnectionInfo{} + for k, v := range val { + switch k { + case "expires_at": + if val, ok := v.(string); ok { + expiresAt, err := time.Parse(time.RFC3339, val) + if err != nil { + log.Fatal("failed to parse expires_at: %s", err) + } + connection.ExpiresAt = &expiresAt + } + case "websocket_url": + if val, ok := v.(string); ok { + connection.WebsocketURL = val + } + case "stream_url": + if val, ok := v.(string); ok { + connection.StreamURL = val + } + case "client_url": + if val, ok := v.(string); ok { + connection.ClientURL = val + } + case "replies_url": + if val, ok := v.(string); ok { + connection.RepliesURL = val + } + case "refresh_url": + if val, ok := v.(string); ok { + connection.RefreshURL = val + } + case "control_url": + if val, ok := v.(string); ok { + connection.ControlURL = val + } + } + } + } + + server, err := dev.New(dev.ServerArgs{ Ctx: ctx, Logger: log, - WebsocketId: websocketId, - WebsocketUrl: websocketUrl, + LogLevel: logLevel, + APIURL: apiUrl, + TransportURL: transportUrl, APIKey: apiKey, + ProjectToken: projectToken, Project: theproject, Version: Version, OrgId: orgId, + UserId: userId, + Connection: connection, + Port: port, }) if err != nil { log.Fatal("failed to create live dev connection: %s", err) } - defer websocketConn.Close() + defer server.Close() processCtx := context.Background() var pid int - projectServerCmd, err := dev.CreateRunProjectCmd(processCtx, log, theproject, websocketConn, dir, orgId, port) + consoleUrl := server.WebURL(appUrl) + publicUrl := server.PublicURL(appUrl) + devModeUrl := fmt.Sprintf("http://127.0.0.1:%d", port) + + agents := make([]dev.Agent, 0) + for _, agent := range theproject.Project.Agents { + agents = append(agents, dev.Agent{ + ID: agent.ID, + Name: agent.Name, + LocalURL: fmt.Sprintf("%s/%s", devModeUrl, agent.ID), + PublicURL: fmt.Sprintf("%s/%s", publicUrl, agent.ID), + }) + } + + ui := dev.NewDevModeUI(ctx, dev.DevModeConfig{ + DevModeUrl: devModeUrl, + PublicUrl: publicUrl, + AppUrl: consoleUrl, + Agents: agents, + }) + + ui.Start() + + defer ui.Close() + + tuiLogger := dev.NewTUILogger(logLevel, ui) + + if err := server.Connect(ui, tuiLogger); err != nil { + ui.Close() + tuiLogger.Fatal("failed to start live dev connection: %s", err) + } + + projectServerCmd, err := dev.CreateRunProjectCmd(processCtx, tuiLogger, theproject, server, dir, orgId, port, tuiLogger) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to run project")).ShowErrorAndExit() } @@ -113,24 +218,24 @@ Examples: build := func(initial bool) { started := time.Now() var ok bool - tui.ShowSpinner("Building project ...", func() { + ui.ShowSpinner("Building project ...", func() { if err := bundler.Bundle(bundler.BundleContext{ Context: ctx, - Logger: log, + Logger: tuiLogger, ProjectDir: dir, Production: false, DevMode: !initial, }); err != nil { if err == bundler.ErrBuildFailed { - log.Error("build failed ...") + tuiLogger.Error("build failed ...") return } errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to bundle project: %s", err))).ShowErrorAndExit() } ok = true }) - if ok { - fmt.Println(tui.Text(fmt.Sprintf("✨ Built in %s", time.Since(started).Round(time.Millisecond)))) + if ok && !initial { + ui.SetStatusMessage("✨ Built in %s", time.Since(started).Round(time.Millisecond)) } } @@ -140,23 +245,26 @@ Examples: restart := func() { isDeliberateRestart = true build(false) - log.Debug("killing project server") - dev.KillProjectServer(log, projectServerCmd, pid) - log.Debug("killing project server done") + tuiLogger.Debug("killing project server") + dev.KillProjectServer(tuiLogger, projectServerCmd, pid) + tuiLogger.Debug("killing project server done") } + ui.SetStatusMessage("starting ...") + ui.SetSpinner(true) + // debounce a lot of changes at once to avoid multiple restarts in succession debounced := debounce.New(250 * time.Millisecond) // Watch for changes - watcher, err := dev.NewWatcher(log, dir, theproject.Project.Development.Watch.Files, func(path string) { - log.Trace("%s has changed", path) + watcher, err := dev.NewWatcher(tuiLogger, dir, theproject.Project.Development.Watch.Files, func(path string) { + tuiLogger.Trace("%s has changed", path) debounced(restart) }) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to start watcher: %s", err))).ShowErrorAndExit() } - defer watcher.Close(log) + defer watcher.Close(tuiLogger) if err := projectServerCmd.Start(); err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to start project: %s", err))).ShowErrorAndExit() @@ -164,26 +272,28 @@ Examples: pid = projectServerCmd.Process.Pid - websocketConn.StartReadingMessages(ctx, log, port) - devUrl := websocketConn.WebURL(appUrl) + if err := server.HealthCheck(devModeUrl); err != nil { + ui.Close() + tuiLogger.Fatal("failed to health check connection: %s", err) + } - // Display local interaction instructions - displayLocalInstructions(port, theproject.Project.Agents, devUrl) + ui.SetStatusMessage("🚀 DevMode ready") + ui.SetSpinner(false) go func() { for { - log.Trace("waiting for project server to exit (pid: %d)", pid) + tuiLogger.Trace("waiting for project server to exit (pid: %d)", pid) if err := projectServerCmd.Wait(); err != nil { if !isDeliberateRestart { - log.Error("project server (pid: %d) exited with error: %s", pid, err) + tuiLogger.Error("project server (pid: %d) exited with error: %s", pid, err) } } if projectServerCmd.ProcessState != nil { - log.Debug("project server (pid: %d) exited with code %d", pid, projectServerCmd.ProcessState.ExitCode()) + tuiLogger.Debug("project server (pid: %d) exited with code %d", pid, projectServerCmd.ProcessState.ExitCode()) } else { - log.Debug("project server (pid: %d) exited", pid) + tuiLogger.Debug("project server (pid: %d) exited", pid) } - log.Debug("isDeliberateRestart: %t, pid: %d", isDeliberateRestart, pid) + tuiLogger.Debug("isDeliberateRestart: %t, pid: %d", isDeliberateRestart, pid) if !isDeliberateRestart { return } @@ -191,8 +301,8 @@ Examples: // If it was a deliberate restart, start the new process here if isDeliberateRestart { isDeliberateRestart = false - log.Trace("restarting project server") - projectServerCmd, err = dev.CreateRunProjectCmd(processCtx, log, theproject, websocketConn, dir, orgId, port) + tuiLogger.Trace("restarting project server") + projectServerCmd, err = dev.CreateRunProjectCmd(processCtx, tuiLogger, theproject, server, dir, orgId, port, tuiLogger) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to run project")).ShowErrorAndExit() } @@ -200,71 +310,30 @@ Examples: errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to start project: %s", err))).ShowErrorAndExit() } pid = projectServerCmd.Process.Pid - log.Trace("restarted project server (pid: %d)", pid) + tuiLogger.Trace("restarted project server (pid: %d)", pid) } } }() teardown := func() { - watcher.Close(log) - websocketConn.Close() - dev.KillProjectServer(log, projectServerCmd, pid) + watcher.Close(tuiLogger) + server.Close() + dev.KillProjectServer(tuiLogger, projectServerCmd, pid) } select { - case <-websocketConn.Done(): - log.Info("live dev connection closed, shutting down") + case <-ui.Done(): teardown() case <-ctx.Done(): - log.Info("context done, shutting down") teardown() } }, } -func displayLocalInstructions(port int, agents []project.AgentConfig, devModeUrl string) { - title := tui.Title("🚀 Local Agent Interaction") - - // Combine all elements with appropriate spacing - fmt.Println() - fmt.Println(title) - - // Create list of available agents - if len(agents) > 0 { - fmt.Println() - - for _, agent := range agents { - // Display agent name and ID - fmt.Println(tui.Text(" • ") + tui.PadRight(agent.Name, 20, " ") + " " + tui.Muted(agent.ID)) - } - } - - // Get a sample agent ID if available - sampleAgentID := "agent_ID" - if len(agents) > 0 { - sampleAgentID = agents[0].ID - } - - curlCommand := fmt.Sprintf("curl -v http://127.0.0.1:%d/%s --json '{\"input\": \"Hello, world!\"}'", port, sampleAgentID) - - fmt.Println() - fmt.Println(tui.Text("To interact with your agents locally, you can use:")) - fmt.Println() - fmt.Println(tui.Highlight(curlCommand)) - fmt.Println() - - fmt.Print(tui.Text("Or use the 💻 Dev Mode in our app: ")) - fmt.Println(tui.Link("%s", devModeUrl)) - - fmt.Println() -} - func init() { rootCmd.AddCommand(devCmd) devCmd.Flags().StringP("dir", "d", ".", "The directory to run the development server in") - devCmd.Flags().String("websocket-id", "", "The websocket room id to use for the development agent") devCmd.Flags().String("org-id", "", "The organization to run the project") devCmd.Flags().Int("port", 0, "The port to run the development server on (uses project default if not provided)") - devCmd.Flags().MarkHidden("websocket-id") devCmd.Flags().MarkHidden("org-id") } diff --git a/go.mod b/go.mod index 2bda1397..a7762fa0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.2 require ( github.com/Masterminds/semver v1.5.0 - github.com/agentuity/go-common v1.0.39 + github.com/agentuity/go-common v1.0.45 github.com/agentuity/mcp-golang/v2 v2.0.2 github.com/bep/debounce v1.2.1 github.com/bmatcuk/doublestar/v4 v4.8.1 @@ -16,7 +16,6 @@ require ( github.com/evanw/esbuild v0.25.0 github.com/fsnotify/fsnotify v1.7.0 github.com/google/uuid v1.6.0 - github.com/gorilla/websocket v1.5.3 github.com/marcozac/go-jsonc v0.1.1 github.com/mattn/go-isatty v0.0.20 github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c @@ -25,6 +24,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/zijiren233/yaml-comment v0.2.2 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/term v0.30.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.32.1 ) @@ -48,6 +48,7 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/pjbgf/sha1cd v0.3.2 // indirect + github.com/sahilm/fuzzy v0.1.1 // indirect github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect github.com/skeema/knownhosts v1.3.1 // indirect github.com/tidwall/gjson v1.18.0 // indirect @@ -58,7 +59,6 @@ require ( github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect golang.org/x/crypto v0.36.0 // indirect - golang.org/x/term v0.30.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect ) @@ -67,7 +67,7 @@ require ( github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/buger/goterm v1.0.4 // indirect github.com/catppuccin/go v0.2.0 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/x/ansi v0.8.0 // indirect github.com/charmbracelet/x/exp/strings v0.0.0-20240722160745-212f7b056ed0 // indirect diff --git a/go.sum b/go.sum index 62ad6e40..2f543628 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.1.5 h1:eoAQfK2dwL+tFSFpr7TbOaPNUbPiJj4fLYwwGE1FQO4= github.com/ProtonMail/go-crypto v1.1.5/go.mod h1:rA3QumHc/FZ8pAHreoekgiAbzpNsfQAosU5td4SnOrE= -github.com/agentuity/go-common v1.0.39 h1:SZEn351+2JMdYBA/aoOUoSuPI8lTDSIKICnu17mrYo4= -github.com/agentuity/go-common v1.0.39/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= +github.com/agentuity/go-common v1.0.45 h1:kjpiIIG3Fmbjne8wtd4lCFikAdbfGal5rqXAeTTPD6I= +github.com/agentuity/go-common v1.0.45/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= github.com/agentuity/mcp-golang/v2 v2.0.2 h1:wZqS/aHWZsQoU/nd1E1/iMsVY2dywWT9+PFlf+3YJxo= github.com/agentuity/mcp-golang/v2 v2.0.2/go.mod h1:U105tZXyTatxxOBlcObRgLb/ULvGgT2DJ1nq/8++P6Q= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= @@ -127,8 +127,6 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= -github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -210,6 +208,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/sahilm/fuzzy v0.1.1 h1:ceu5RHF8DGgoi+/dR5PsECjCDH1BE3Fnmpo7aVXOdRA= +github.com/sahilm/fuzzy v0.1.1/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 h1:D0vL7YNisV2yqE55+q0lFuGse6U8lxlg7fYTctlT5Gc= github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8= diff --git a/internal/dev/dev.go b/internal/dev/dev.go index 4bdc3879..ffcb765a 100644 --- a/internal/dev/dev.go +++ b/internal/dev/dev.go @@ -3,6 +3,7 @@ package dev import ( "context" "fmt" + "io" "net" "os" "os/exec" @@ -97,16 +98,16 @@ func FindAvailablePort(p project.ProjectContext) (int, error) { return findAvailablePort() } -func CreateRunProjectCmd(ctx context.Context, log logger.Logger, theproject project.ProjectContext, liveDevConnection *Websocket, dir string, orgId string, port int) (*exec.Cmd, error) { +func CreateRunProjectCmd(ctx context.Context, log logger.Logger, theproject project.ProjectContext, server *Server, dir string, orgId string, port int, writer io.Writer) (*exec.Cmd, error) { // set the vars projectServerCmd := exec.CommandContext(ctx, theproject.Project.Development.Command, theproject.Project.Development.Args...) projectServerCmd.Env = os.Environ()[:] - projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_BEARER_TOKEN=%s", liveDevConnection.OtelToken)) - projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_URL=%s", liveDevConnection.OtelUrl)) + projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_BEARER_TOKEN=%s", server.otelToken)) + projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_URL=%s", server.otelUrl)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_URL=%s", theproject.APIURL)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_TRANSPORT_URL=%s", theproject.TransportURL)) - projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_DEPLOYMENT_ID=%s", liveDevConnection.webSocketId)) + projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_DEPLOYMENT_ID=%s", server.ID)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_PROJECT_ID=%s", theproject.Project.ProjectId)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_ORG_ID=%s", orgId)) @@ -121,9 +122,8 @@ func CreateRunProjectCmd(ctx context.Context, log logger.Logger, theproject proj projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_PORT=%d", port)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("PORT=%d", port)) - projectServerCmd.Stdout = os.Stdout - projectServerCmd.Stderr = os.Stderr - projectServerCmd.Stdin = os.Stdin + projectServerCmd.Stdout = writer + projectServerCmd.Stderr = writer projectServerCmd.Dir = dir util.ProcessSetup(projectServerCmd) diff --git a/internal/dev/pending_logger.go b/internal/dev/pending_logger.go new file mode 100644 index 00000000..2453eacc --- /dev/null +++ b/internal/dev/pending_logger.go @@ -0,0 +1,145 @@ +package dev + +import ( + "context" + "fmt" + "os" + "sync" + + "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/tui" +) + +type PendingLogger struct { + pending []string + logLevel logger.LogLevel + logger logger.Logger + mutex sync.RWMutex +} + +var _ logger.Logger = (*PendingLogger)(nil) + +func NewPendingLogger(logLevel logger.LogLevel) *PendingLogger { + return &PendingLogger{ + pending: make([]string, 0), + logLevel: logLevel, + } +} + +func (l *PendingLogger) drain(ui *DevModeUI, logger logger.Logger) { + l.mutex.Lock() + defer l.mutex.Unlock() + for _, val := range l.pending { + ui.AddLog("%s", val) + } + l.logger = logger + l.pending = nil +} + +// With will return a new logger using metadata as the base context +func (l *PendingLogger) With(metadata map[string]interface{}) logger.Logger { + return l +} + +// WithPrefix will return a new logger with a prefix prepended to the message +func (l *PendingLogger) WithPrefix(prefix string) logger.Logger { + return l +} + +// WithContext will return a new logger with the given context +func (l *PendingLogger) WithContext(ctx context.Context) logger.Logger { + return l +} + +// Trace level logging +func (l *PendingLogger) Trace(msg string, args ...interface{}) { + if logger.LevelTrace < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Trace(msg, args...) + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Debug level logging +func (l *PendingLogger) Debug(msg string, args ...interface{}) { + if logger.LevelDebug < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Debug(msg, args...) + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Info level loggi ng +func (l *PendingLogger) Info(msg string, args ...interface{}) { + if logger.LevelInfo < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Info(msg, args...) + return + } + val := tui.Text("[INFO] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Warning level logging +func (l *PendingLogger) Warn(msg string, args ...interface{}) { + if logger.LevelWarn < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Warn(msg, args...) + return + } + val := tui.Title("[WARN] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Error level logging +func (l *PendingLogger) Error(msg string, args ...interface{}) { + if logger.LevelError < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Error(msg, args...) + return + } + val := tui.Bold("[ERROR] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Fatal level logging and exit with code 1 +func (l *PendingLogger) Fatal(msg string, args ...interface{}) { + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Fatal(msg, args...) + return + } + val := tui.Bold("[FATAL] " + fmt.Sprintf(msg, args...)) + fmt.Println(val) + os.Exit(1) +} + +// Stack will return a new logger that logs to the given logger as well as the current logger +func (l *PendingLogger) Stack(next logger.Logger) logger.Logger { + return l +} diff --git a/internal/dev/request.go b/internal/dev/request.go new file mode 100644 index 00000000..c8acf2bb --- /dev/null +++ b/internal/dev/request.go @@ -0,0 +1,212 @@ +package dev + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/agentuity/go-common/bridge" + "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/telemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +var propagator propagation.TraceContext + +type packet struct { + buf []byte +} + +func (p *packet) String() string { + return fmt.Sprintf("packet{buf: %d bytes}", len(p.buf)) +} + +type AgentRequest struct { + ctx context.Context + logger logger.Logger + pending chan *packet + req *http.Request + statusCode int + headers map[string]string + body io.ReadCloser + span trace.Span + started time.Time + completed chan struct{} +} + +var _ io.Reader = (*AgentRequest)(nil) + +type AgentRequestArgs struct { + Context context.Context + Logger logger.Logger + Tracer trace.Tracer + ID string + URL string + Version string + AgentID string + OrgID string + ProjectID string + Headers map[string]string +} + +func NewAgentRequest(args AgentRequestArgs) (*AgentRequest, error) { + started := time.Now() + var err error + + sctx, logger, span := telemetry.StartSpan(args.Context, args.Logger, args.Tracer, "TriggerRun", + trace.WithAttributes( + attribute.Bool("@agentuity/devmode", true), + attribute.String("trigger", "manual"), + attribute.String("@agentuity/deploymentId", args.ID), + ), + trace.WithSpanKind(trace.SpanKindConsumer), + ) + + defer func() { + // only end the span if there was an error + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes( + attribute.Int64("@agentuity/cpu_time", time.Since(started).Milliseconds()), + ) + span.End() + } + }() + + span.SetAttributes( + attribute.String("@agentuity/agentId", args.AgentID), + attribute.String("@agentuity/orgId", args.OrgID), + attribute.String("@agentuity/projectId", args.ProjectID), + attribute.String("@agentuity/env", "development"), + ) + + spanContext := span.SpanContext() + traceState := spanContext.TraceState() + traceState, err = traceState.Insert("id", args.AgentID) + if err != nil { + logger.Error("failed to insert agent id into trace state: %s", err) + err = fmt.Errorf("failed to insert agent id into trace state: %w", err) + return nil, err + } + traceState, err = traceState.Insert("oid", args.OrgID) + if err != nil { + logger.Error("failed to insert org id into trace state: %s", err) + err = fmt.Errorf("failed to insert org id into trace state: %w", err) + return nil, err + } + traceState, err = traceState.Insert("pid", args.ProjectID) + if err != nil { + logger.Error("failed to insert project id into trace state: %s", err) + err = fmt.Errorf("failed to insert project id into trace state: %w", err) + return nil, err + } + + ctx := trace.ContextWithSpanContext(sctx, spanContext.WithTraceState(traceState)) + + agentReq := &AgentRequest{ + ctx: ctx, + logger: logger, + pending: make(chan *packet, 10), + completed: make(chan struct{}, 1), + started: started, + span: span, + } + req, err := http.NewRequestWithContext(ctx, "POST", args.URL, agentReq) + if err != nil { + return nil, err + } + + for k, v := range args.Headers { + req.Header.Set(k, v) + } + + req.Header.Set("x-agentuity-trigger", "manual") + req.Header.Set("User-Agent", "Agentuity CLI/"+args.Version) + propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)) + + agentReq.req = req + + return agentReq, nil +} + +func (r *AgentRequest) Run() error { + var err error + var resp *http.Response + + defer func() { + if err != nil { + r.span.RecordError(err) + r.span.SetStatus(codes.Error, err.Error()) + } + }() + + r.logger.Debug("sending request to agent: %s", r.req.URL) + resp, err = http.DefaultClient.Do(r.req) + if err != nil { + return err + } + r.logger.Debug("sent request to agent: %s, returned: %d", r.req.URL, resp.StatusCode) + r.statusCode = resp.StatusCode + r.headers = make(map[string]string) + for k, v := range resp.Header { + r.headers[k] = strings.Join(v, ", ") + } + r.body = resp.Body + r.req = nil + r.completed <- struct{}{} // signal that the request is complete + return nil +} + +func (r *AgentRequest) Read(p []byte) (n int, err error) { + select { + case packet := <-r.pending: + if packet == nil { + r.logger.Debug("incoming buffer is EOF") + return 0, io.EOF + } + if len(packet.buf) > len(p) { + return 0, fmt.Errorf("incoming buffer is larger (%d) than the outgoing buffer (%d)", len(packet.buf), len(p)) + } + return copy(p, packet.buf), nil + case <-r.ctx.Done(): + return 0, r.ctx.Err() + } +} + +func (r *AgentRequest) close(client *bridge.Client, id string) { + r.logger.Trace("closing agent request: %s", id) + close(r.pending) + defer func() { + if r.body != nil { + r.body.Close() + } + r.statusCode = 0 + r.headers = nil + r.body = nil + r.req = nil + r.completed = nil + if r.span != nil { + r.span.End() + r.span = nil + } + }() + r.logger.Trace("waiting for agent request to complete: %s", id) + <-r.completed // block for the request to complete + r.logger.Debug("replying to agent request: %s, status: %d, headers: %v", id, r.statusCode, r.headers) + if err := client.Reply(id, r.statusCode, r.headers, r.body); err != nil { + r.logger.Error("failed to reply to agent request: %s", err) + } + r.logger.Info("processed sess_%s in %s", r.span.SpanContext().TraceID(), time.Since(r.started)) +} + +func (r *AgentRequest) send(buf []byte) { + r.logger.Trace("sending buffer: %d bytes", len(buf)) + r.pending <- &packet{buf} +} diff --git a/internal/dev/server.go b/internal/dev/server.go new file mode 100644 index 00000000..640ea52b --- /dev/null +++ b/internal/dev/server.go @@ -0,0 +1,333 @@ +package dev + +import ( + "context" + "errors" + "fmt" + "math" + "net/http" + "sync" + "time" + + "github.com/agentuity/cli/internal/project" + "github.com/agentuity/cli/internal/util" + "github.com/agentuity/go-common/bridge" + "github.com/agentuity/go-common/logger" + cstr "github.com/agentuity/go-common/string" + "github.com/agentuity/go-common/telemetry" + "github.com/spf13/viper" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type Server struct { + ID string + otelToken string + otelUrl string + Project project.ProjectContext + orgId string + userId string + apiurl string + transportUrl string + apiKey string + ctx context.Context + logger logger.Logger + tracer trace.Tracer + version string + bridge *bridge.Client + once sync.Once + apiclient *util.APIClient + registered bool + publicUrl string + port int + connected chan struct{} + pendingLogger *PendingLogger + pending map[string]*AgentRequest + pendingMu sync.RWMutex + cleanup func() +} + +type ServerArgs struct { + Ctx context.Context + Logger logger.Logger + LogLevel logger.LogLevel + APIURL string + TransportURL string + APIKey string + ProjectToken string + Project project.ProjectContext + OrgId string + UserId string + Version string + Connection *bridge.BridgeConnectionInfo + Port int +} + +var _ bridge.Handler = (*Server)(nil) + +func (c *Server) WebURL(appUrl string) string { + return fmt.Sprintf("%s/devmode/%s", appUrl, c.ID) +} + +func (c *Server) PublicURL(appUrl string) string { + return c.publicUrl +} + +func (s *Server) AgentURL(agentId string) string { + return fmt.Sprintf("http://127.0.0.1:%d/%s", s.port, agentId) +} + +// Close closes the bridge client and cleans up the connection +func (s *Server) Close() error { + s.logger.Debug("closing bridge client") + s.once.Do(func() { + if s.registered { + if err := s.apiclient.Do("DELETE", "/cli/devmode/"+s.ID, map[string]string{"orgId": s.orgId}, nil); err != nil { + s.logger.Error("failed to unregister devmode connection: %s", err) + } + } + s.bridge.Close() + s.cleanup() + }) + return nil +} + +type ConnectionResponse struct { + Success bool `json:"success"` + Error string `json:"message"` + Data struct { + OtelUrl string `json:"otlpUrl"` + OtelBearerToken string `json:"otlpBearerToken"` + } `json:"data"` +} + +// OnConnect is called when the bridge client is connected to the bridge server +func (s *Server) OnConnect(client *bridge.Client) error { + s.logger.Debug("on connect") + + defer func() { + s.connected <- struct{}{} // signal that the connection is established (even if there was an error) + }() + + payload := map[string]string{ + "orgId": s.orgId, + "publicURL": client.ClientURL(), + "websocketURL": client.WebsocketURL(), + } + + var response ConnectionResponse + + if err := s.apiclient.Do("PUT", "/cli/devmode/"+s.ID, payload, &response); err != nil { + return fmt.Errorf("failed to register devmode connection with api server: %w", err) + } + + s.otelUrl = response.Data.OtelUrl + s.otelToken = response.Data.OtelBearerToken + + tctx, _, cleanup, err := telemetry.NewWithAPIKey(s.ctx, "@agentuity/cli", s.otelUrl, s.otelToken, s.logger) + if err != nil { + return fmt.Errorf("failed to create telemetry client: %w", err) + } + + s.ctx = tctx + s.cleanup = cleanup + + s.saveConnection(client) + + return nil +} + +// OnDisconnect is called when the bridge client is disconnected from the bridge server +func (s *Server) OnDisconnect(client *bridge.Client) { + s.logger.Debug("on disconnect") +} + +// OnHeader is called when a header is received from the bridge. this will only be called once before any data is sent. +func (s *Server) OnHeader(client *bridge.Client, id string, agentId string, headers map[string]string) { + s.logger.Debug("on header, id: %s, agent: %s, headers: %s", id, agentId, headers) + + req := AgentRequestArgs{ + Context: s.ctx, + Logger: s.logger, + Tracer: s.tracer, + ID: s.ID, + Version: s.version, + URL: s.AgentURL(agentId), + Headers: headers, + AgentID: agentId, + OrgID: s.orgId, + ProjectID: s.Project.Project.ProjectId, + } + agentReq, err := NewAgentRequest(req) + if err != nil { + s.logger.Error("failed to create request: %s", err) + return + } + s.pendingMu.Lock() + s.pending[id] = agentReq + s.pendingMu.Unlock() + + go agentReq.Run() // run this in a new goroutine so as not to block the main bridge thread + s.logger.Debug("header exiting: %s, agent: %s", id, agentId) +} + +// OnData is called when a data is received from the bridge. this will be called multiple times if the data is large. +func (s *Server) OnData(client *bridge.Client, id string, agentId string, data []byte) { + s.logger.Debug("on data: id: %s, agent: %s", id, agentId) + s.pendingMu.RLock() + defer s.pendingMu.RUnlock() + if req, ok := s.pending[id]; ok { + req.send(data) + } else { + s.logger.Error("no pending request for id: %s and agent: %s", id, agentId) + } +} + +// OnClose is called when the bridge request is completed and no more data will be sent +func (s *Server) OnClose(client *bridge.Client, id string, agentId string) { + s.logger.Debug("on close: id: %s, param: %s", id, agentId) + s.pendingMu.Lock() + defer s.pendingMu.Unlock() + if req, ok := s.pending[id]; ok { + delete(s.pending, id) + req.close(client, id) + } else { + s.logger.Error("no pending request for id: %s and agent: %s", id, agentId) + } +} + +// OnError is called when an error occurs at any point in the bridge client +func (s *Server) OnError(client *bridge.Client, err error) { + s.logger.Error("an error occurred: %s", err) +} + +// OnControl is called when a control event is received from the bridge. you can respond with a control event to the bridge by returning a non-nil value. +func (s *Server) OnControl(client *bridge.Client, id string, data []byte) ([]byte, error) { + s.logger.Debug("on control: id: %s, data: %s", id, string(data)) + return nil, nil +} + +// OnRefresh is called when the bridge client has refreshed its connection +func (s *Server) OnRefresh(client *bridge.Client) { + s.saveConnection(client) +} + +func (s *Server) saveConnection(client *bridge.Client) { + s.registered = true + kv := map[string]string{} + conn := client.ConnectionInfo() + s.publicUrl = conn.ClientURL + if conn.ExpiresAt != nil { + kv["expires_at"] = conn.ExpiresAt.Format(time.RFC3339) + } + if conn.WebsocketURL != "" { + kv["websocket_url"] = conn.WebsocketURL + } + if conn.StreamURL != "" { + kv["stream_url"] = conn.StreamURL + } + if conn.ClientURL != "" { + kv["client_url"] = conn.ClientURL + } + if conn.RepliesURL != "" { + kv["replies_url"] = conn.RepliesURL + } + if conn.RefreshURL != "" { + kv["refresh_url"] = conn.RefreshURL + } + if conn.ControlURL != "" { + kv["control_url"] = conn.ControlURL + } + viper.Set("devmode."+s.orgId, kv) + viper.WriteConfig() +} + +func (s *Server) HealthCheck(devModeUrl string) error { + started := time.Now() + var i int + for time.Since(started) < 15*time.Second { + i++ + s.logger.Trace("health check request: %s", fmt.Sprintf("%s/_health", devModeUrl)) + req, err := http.NewRequestWithContext(s.ctx, "GET", fmt.Sprintf("%s/_health", devModeUrl), nil) + if err != nil { + return fmt.Errorf("failed to create health check request: %w", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + s.logger.Trace("health check request failed: %s", err) + dur := time.Millisecond * 150 * time.Duration(math.Pow(float64(i), 2)) + time.Sleep(dur) + continue + } + s.logger.Trace("health check request returned status code: %d", resp.StatusCode) + if resp.StatusCode != 200 { + s.logger.Trace("health check returned status code: %d", resp.StatusCode) + dur := time.Millisecond * 150 * time.Duration(math.Pow(float64(i), 2)) + time.Sleep(dur) + continue + } + return nil + } + return fmt.Errorf("health check failed after %s", time.Since(started)) +} + +func (s *Server) Connect(ui *DevModeUI, tuiLogger logger.Logger) error { + s.logger = tuiLogger + s.pendingLogger.drain(ui, s.logger) + s.pendingLogger = nil + <-s.connected + close(s.connected) + return nil +} + +func New(args ServerArgs) (*Server, error) { + id := cstr.NewHash(args.OrgId, args.UserId) + tracer := otel.Tracer("@agentuity/cli", trace.WithInstrumentationAttributes( + attribute.String("id", id), + attribute.String("@agentuity/orgId", args.OrgId), + attribute.String("@agentuity/userId", args.UserId), + attribute.Bool("@agentuity/devmode", true), + attribute.String("name", "@agentuity/cli"), + attribute.String("version", args.Version), + ), trace.WithInstrumentationVersion(args.Version)) + + pendingLogger := NewPendingLogger(args.LogLevel) + + server := &Server{ + ID: id, + logger: pendingLogger, + ctx: args.Ctx, + apiurl: args.APIURL, + transportUrl: args.TransportURL, + apiKey: args.APIKey, + Project: args.Project, + orgId: args.OrgId, + userId: args.UserId, + tracer: tracer, + version: args.Version, + port: args.Port, + apiclient: util.NewAPIClient(context.Background(), args.Logger, args.APIURL, args.APIKey), + pendingLogger: pendingLogger, + pending: make(map[string]*AgentRequest), + connected: make(chan struct{}, 1), + } + + server.bridge = bridge.New(bridge.Options{ + Context: args.Ctx, + Logger: pendingLogger, + APIKey: args.ProjectToken, + URL: args.TransportURL, + ConnectionInfo: args.Connection, + Handler: server, + }) + + if err := server.bridge.Connect(); err != nil { + return nil, err + } + + return server, nil +} diff --git a/internal/dev/tui.go b/internal/dev/tui.go new file mode 100644 index 00000000..e0cabec0 --- /dev/null +++ b/internal/dev/tui.go @@ -0,0 +1,487 @@ +package dev + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/agentuity/go-common/tui" + "github.com/charmbracelet/bubbles/key" + "github.com/charmbracelet/bubbles/list" + "github.com/charmbracelet/bubbles/spinner" + "github.com/charmbracelet/bubbles/viewport" + tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/lipgloss" + "golang.org/x/term" +) + +var ( + resumeKey = key.NewBinding(key.WithKeys("r"), key.WithHelp("r", "resume"), key.WithDisabled()) + pauseKey = key.NewBinding(key.WithKeys("p"), key.WithHelp("p", "pause")) + helpKey = key.NewBinding(key.WithKeys("h"), key.WithHelp("h", "show help")) + agentsKey = key.NewBinding(key.WithKeys("a"), key.WithHelp("a", "show agents")) + logoColor = lipgloss.AdaptiveColor{Light: "#11c7b9", Dark: "#00FFFF"} + labelColor = lipgloss.AdaptiveColor{Light: "#999999", Dark: "#FFFFFF"} + runningColor = lipgloss.AdaptiveColor{Light: "#00FF00", Dark: "#009900"} + pausedColor = lipgloss.AdaptiveColor{Light: "#FFA500", Dark: "#FFA500"} + statusColor = lipgloss.AdaptiveColor{Light: "#750075", Dark: "#FF5CFF"} + runningStyle = lipgloss.NewStyle().Foreground(runningColor) + pausedStyle = lipgloss.NewStyle().Foreground(pausedColor).AlignHorizontal(lipgloss.Center) + labelStyle = lipgloss.NewStyle().Foreground(labelColor).Bold(true) + statusMsgStyle = lipgloss.NewStyle().Foreground(statusColor).Margin(0) + viewPortHelpStyle = lipgloss.NewStyle().Foreground(lipgloss.AdaptiveColor{Light: "#000000", Dark: "#999999"}).Background(lipgloss.AdaptiveColor{Light: "#999999", Dark: "#222222"}).AlignHorizontal(lipgloss.Left).MarginTop(1) + statusMsgHeight = 2 +) + +type model struct { + infoBox string + statusMessage string + logList list.Model + logItems []list.Item + windowSize tea.WindowSizeMsg + viewport *viewport.Model + paused bool + showhelp bool + showagents bool + agents []Agent + selectedLog *logItem + spinner spinner.Model + spinning bool + devModeUrl string + publicUrl string + appUrl string +} + +type spinnerStartMsg struct{} +type spinnerStopMsg struct{} + +type logItem string + +func (i logItem) Title() string { return strings.ReplaceAll(string(i), "\n", " ") } +func (i logItem) Description() string { return "" } +func (i logItem) FilterValue() string { return string(i) } + +type tickMsg time.Time +type addLogMsg string +type statusMessageMsg string + +func tick() tea.Cmd { + return tea.Tick(time.Second, func(t time.Time) tea.Msg { + return tickMsg(t) + }) +} + +func initialModel(config DevModeConfig) *model { + + width, height, err := term.GetSize(int(os.Stdout.Fd())) + if err != nil { + fmt.Println("Error getting terminal size:", err) + } + + spinner := spinner.New(spinner.WithSpinner(spinner.Dot), spinner.WithStyle(statusMsgStyle.MarginLeft(1).MarginRight(0))) + + items := []list.Item{} + + listDelegate := list.NewDefaultDelegate() + listDelegate.ShowDescription = false + listDelegate.SetSpacing(0) + listDelegate.Styles.NormalTitle = listDelegate.Styles.NormalTitle.Padding(0, 1) + listDelegate.Styles.SelectedTitle = listDelegate.Styles.SelectedTitle.BorderLeft(false).Foreground(labelColor).Bold(true) + + l := list.New(items, listDelegate, width-2, 10) + l.SetShowTitle(false) + l.SetShowStatusBar(false) + l.SetShowPagination(true) + l.SetFilteringEnabled(true) + l.SetShowHelp(true) + l.SetStatusBarItemName("log", "logs") + l.Styles.NoItems = l.Styles.NoItems.MarginLeft(1) + l.Styles.HelpStyle = l.Styles.HelpStyle.AlignHorizontal(lipgloss.Center).Width(width) + + l.AdditionalShortHelpKeys = func() []key.Binding { + return []key.Binding{ + resumeKey, + pauseKey, + helpKey, + agentsKey, + } + } + l.AdditionalFullHelpKeys = func() []key.Binding { + return []key.Binding{ + resumeKey, + pauseKey, + helpKey, + agentsKey, + } + } + + m := &model{ + logList: l, + logItems: items, + spinner: spinner, + windowSize: tea.WindowSizeMsg{Width: width, Height: height}, + devModeUrl: config.DevModeUrl, + publicUrl: config.PublicUrl, + appUrl: config.AppUrl, + agents: config.Agents, + } + + m.infoBox = m.generateInfoBox() + + infoBoxHeight := lipgloss.Height(m.infoBox) + available := height - infoBoxHeight - statusMsgHeight + if available < 1 { + available = 1 + } + m.logList.SetHeight(available) + + return m +} + +func (m *model) Init() tea.Cmd { + return tick() +} + +func label(s string) string { + return labelStyle.Render(tui.PadRight(s, 10, " ")) +} + +func (m *model) generateInfoBox() string { + var statusStyle = runningStyle + if m.paused { + statusStyle = pausedStyle + } + var devmodeBox = lipgloss.NewStyle(). + Width(m.windowSize.Width-2). + Border(lipgloss.NormalBorder()). + BorderForeground(logoColor). + Padding(1, 2). + AlignVertical(lipgloss.Top). + AlignHorizontal(lipgloss.Left). + Foreground(labelColor) + + content := fmt.Sprintf(`%s + +%s %s +%s %s +%s %s`, + tui.Bold("⨺ Agentuity DevMode")+" "+statusStyle.Render(tui.PadLeft("⏺", m.windowSize.Width-25, " ")), + label("Dashboard"), tui.Link("%s", m.appUrl), + label("Local"), tui.Link("%s", m.devModeUrl), + label("Public"), tui.Link("%s", m.publicUrl)+" "+tui.Muted("(only accessible in devmode)"), + ) + return devmodeBox.Render(content) +} + +func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + var cmd []tea.Cmd + + switch msg := msg.(type) { + case spinnerStartMsg: + m.spinning = true + break + case spinnerStopMsg: + m.spinning = false + break + case spinner.TickMsg: + sm, c := m.spinner.Update(msg) + m.spinner = sm + cmd = append(cmd, c) + break + case tea.KeyMsg: + if msg.Type == tea.KeyCtrlC { + cmd = append(cmd, tea.Quit) + break + } + if msg.Type == tea.KeyEscape { + if m.showhelp { + m.showhelp = false + return m, nil + } + if m.showagents { + m.showagents = false + m.viewport = nil + return m, nil + } + if m.selectedLog != nil { + m.selectedLog = nil + return m, nil + } + } + if msg.Type == tea.KeyEnter && m.selectedLog == nil { + if sel := m.logList.SelectedItem(); sel != nil { + if log, ok := sel.(logItem); ok { + m.selectedLog = &log + break + } + } + } + if msg.Type == tea.KeyRunes { + switch msg.String() { + case "p": + m.paused = true + resumeKey.SetEnabled(true) + pauseKey.SetEnabled(false) + case "r": + m.paused = false + resumeKey.SetEnabled(false) + pauseKey.SetEnabled(true) + case "h": + m.showhelp = true + case "a": + m.showagents = true + } + m.infoBox = m.generateInfoBox() + } + if m.viewport != nil { + vp, vpCmd := m.viewport.Update(msg) + m.viewport = &vp + cmd = append(cmd, vpCmd) + } + case tea.WindowSizeMsg: + m.windowSize = msg + // Calculate the height for the info box + infoBoxHeight := lipgloss.Height(m.infoBox) + available := msg.Height - infoBoxHeight - statusMsgHeight + if available < 1 { + available = 1 + } + m.logList.SetHeight(available) + m.logList.SetWidth(m.windowSize.Width - 2) + break + case tickMsg: + m.infoBox = m.generateInfoBox() + cmd = append(cmd, tick()) + break + case addLogMsg: + m.logItems = append([]list.Item{logItem(msg)}, m.logItems...) + if !m.paused { + if m.logList.FilterState() == list.Unfiltered { + m.logList.SetItems(m.logItems) + } + } + break + case statusMessageMsg: + m.statusMessage = string(msg) + break + } + + var lcmd tea.Cmd + m.logList, lcmd = m.logList.Update(msg) + cmd = append(cmd, lcmd) + return m, tea.Batch(cmd...) +} + +func (m *model) View() string { + + var showModal bool + var modalContent string + modalWidth := m.windowSize.Width / 2 + modalHeight := m.windowSize.Height / 2 + if modalWidth < 40 { + modalWidth = 40 + } + if modalHeight < 10 { + modalHeight = 10 + } + + if m.showhelp { + showModal = true + modalContent = lipgloss.JoinVertical( + lipgloss.Left, + tui.Bold("⨺ Agentuity DevMode"), + "", + tui.Secondary("When your project is running in DevMode, you can interact with it by sending messages to your local agents."), + "", "", + tui.Secondary("The best way to do this is to open the Agentuity console in your browser:"), + "", + tui.Link("%s", m.appUrl), + "", "", + tui.Secondary("You can also use curl or wget to send messages to the local agent."), + "", + tui.Secondary(fmt.Sprintf("To send a message to the local agent %s, use the following command:", m.agents[0].Name)), + "", "", + tui.Highlight(fmt.Sprintf("curl %s --json '{\"message\": \"Hello, world!\"}'", m.agents[0].LocalURL)), + "", + tui.Secondary(fmt.Sprintf("To send a message to the local agent %s from a remote machine, use the following command:", m.agents[0].Name)), + "", "", + tui.Highlight(fmt.Sprintf("curl %s --json '{\"message\": \"Hello, world!\"}'", m.agents[0].PublicURL)), + "", + tui.Muted("Note: The public URL is only accessible in devmode and has no authentication while in devmode. This this URL carefully."), + "", "", + tui.Warning("To get help or share your feedback, join our Discord community:"), + "", + tui.Link("https://discord.gg/vtn3hgUfuc"), + "", + ) + } else if m.selectedLog != nil { + showModal = true + modalContent = string(*m.selectedLog) + } else if m.showagents { + showModal = true + modalContent = "Agents" + var agentsContent string + modalWidth = int(float64(m.windowSize.Width) * 0.9) + for _, agent := range m.agents { + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("ID", 10, " "), tui.Muted(agent.ID)) + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("Agent", 10, " "), tui.Title(agent.Name)) + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("Local", 10, " "), tui.Link("%s", agent.LocalURL)) + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("Public", 10, " "), tui.Link("%s", agent.PublicURL)) + agentsContent += "\n" + } + modalContent = agentsContent + } + + if showModal { + modal := lipgloss.NewStyle().Padding(2) + if m.viewport == nil { + vp := viewport.New(m.windowSize.Width, m.windowSize.Height-1) + vp.SetYOffset(1) + m.viewport = &vp + } + m.viewport.SetContent(modal.Render(modalContent)) + m.viewport.Width = m.windowSize.Width + esc := "ESC to close" + pct := fmt.Sprintf("%3.f%%", m.viewport.ScrollPercent()*100) + spacer := m.windowSize.Width - lipgloss.Width(esc) - lipgloss.Width(pct) + 3 + right := lipgloss.NewStyle().AlignHorizontal(lipgloss.Right).Width(spacer).Render(pct) + return m.viewport.View() + "\n" + viewPortHelpStyle.Width(m.windowSize.Width).Render(lipgloss.JoinHorizontal(lipgloss.Left, esc, right)) + } + + var view string + + if m.spinning { + view = m.spinner.View() + " " + } else { + view = " " + } + + return fmt.Sprintf("%s\n%s\n%s", m.infoBox, view+statusMsgStyle.Render(m.statusMessage), m.logList.View()) +} + +type Agent struct { + ID string + Name string + LocalURL string + PublicURL string +} + +type DevModeUI struct { + ctx context.Context + cancel context.CancelFunc + model *model + program *tea.Program + wg sync.WaitGroup + once sync.Once + + spinnerCtx context.Context + spinnerCancel context.CancelFunc +} + +type DevModeConfig struct { + DevModeUrl string + PublicUrl string + AppUrl string + Agents []Agent +} + +func NewDevModeUI(ctx context.Context, config DevModeConfig) *DevModeUI { + ctx, cancel := context.WithCancel(ctx) + return &DevModeUI{ + ctx: ctx, + cancel: cancel, + model: initialModel(config), + } +} + +// Done returns a channel that will be closed when the program is done +func (d *DevModeUI) Done() <-chan struct{} { + return d.ctx.Done() +} + +// Close the program which will stop the program and wait for it to exit +func (d *DevModeUI) Close() { + d.once.Do(func() { + d.program.Send(tea.Quit) + d.cancel() + d.wg.Wait() + }) +} + +// Start the program +func (d *DevModeUI) Start() { + d.program = tea.NewProgram( + d.model, + tea.WithAltScreen(), + tea.WithoutSignalHandler(), + ) + d.wg.Add(1) + go func() { + defer func() { + d.cancel() + d.wg.Done() + }() + _, err := d.program.Run() + if err != nil { + fmt.Printf("Error running program: %v", err) + } + }() +} + +// Add a log message to the log list +func (d *DevModeUI) AddLog(log string, args ...any) { + d.program.Send(addLogMsg(fmt.Sprintf(log, args...))) +} + +func (d *DevModeUI) SetStatusMessage(msg string, args ...any) { + val := fmt.Sprintf(msg, args...) + d.program.Send(statusMessageMsg(val)) + if val != "" { + go func() { + select { + case <-time.After(time.Second * 3): + if val == d.model.statusMessage { + d.program.Send(statusMessageMsg("")) + } + case <-d.ctx.Done(): + return + } + }() + } +} + +func (d *DevModeUI) ShowSpinner(msg string, fn func()) { + d.SetSpinner(true) + d.SetStatusMessage("%s", msg) + fn() + d.SetStatusMessage("") + d.SetSpinner(false) +} + +func (d *DevModeUI) SetSpinner(spinning bool) { + if spinning { + d.program.Send(spinnerStartMsg{}) + ctx, cancel := context.WithCancel(d.ctx) + d.spinnerCtx = ctx + d.spinnerCancel = cancel + go func() { + t := time.NewTicker(time.Millisecond * 200) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + d.program.Send(d.model.spinner.Tick()) + } + } + }() + } else { + d.spinnerCancel() + d.spinnerCtx = nil + d.program.Send(spinnerStopMsg{}) + } +} diff --git a/internal/dev/tui_logger.go b/internal/dev/tui_logger.go new file mode 100644 index 00000000..7cdcfd99 --- /dev/null +++ b/internal/dev/tui_logger.go @@ -0,0 +1,132 @@ +package dev + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "regexp" + "strings" + + "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/tui" +) + +type TuiLogger struct { + logLevel logger.LogLevel + ui *DevModeUI +} + +func NewTUILogger(logLevel logger.LogLevel, ui *DevModeUI) *TuiLogger { + return &TuiLogger{logLevel: logLevel, ui: ui} +} + +var _ logger.Logger = (*TuiLogger)(nil) +var _ io.Writer = (*TuiLogger)(nil) + +// With will return a new logger using metadata as the base context +func (l *TuiLogger) With(metadata map[string]interface{}) logger.Logger { + return l +} + +// WithPrefix will return a new logger with a prefix prepended to the message +func (l *TuiLogger) WithPrefix(prefix string) logger.Logger { + return l +} + +// WithContext will return a new logger with the given context +func (l *TuiLogger) WithContext(ctx context.Context) logger.Logger { + return l +} + +// Trace level logging +func (l *TuiLogger) Trace(msg string, args ...interface{}) { + if logger.LevelTrace < l.logLevel { + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Debug level logging +func (l *TuiLogger) Debug(msg string, args ...interface{}) { + if logger.LevelDebug < l.logLevel { + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Info level loggi ng +func (l *TuiLogger) Info(msg string, args ...interface{}) { + if logger.LevelInfo < l.logLevel { + return + } + val := tui.Text("[INFO] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Warning level logging +func (l *TuiLogger) Warn(msg string, args ...interface{}) { + if logger.LevelWarn < l.logLevel { + return + } + val := tui.Title("[WARN] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Error level logging +func (l *TuiLogger) Error(msg string, args ...interface{}) { + if logger.LevelError < l.logLevel { + return + } + val := tui.Bold("[ERROR] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Fatal level logging and exit with code 1 +func (l *TuiLogger) Fatal(msg string, args ...interface{}) { + val := tui.Bold("[FATAL] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) + os.Exit(1) +} + +// Stack will return a new logger that logs to the given logger as well as the current logger +func (l *TuiLogger) Stack(next logger.Logger) logger.Logger { + return l +} + +var eol = []byte("\n") +var ansiColorStripper = regexp.MustCompile("\x1b\\[[0-9;]*[mK]") + +func (l *TuiLogger) Write(p []byte) (n int, err error) { + trimmed := bytes.Split(p, eol) + for _, line := range trimmed { + if len(line) == 0 { + continue + } + log := string(line) + if len(log) > 20 { + prefix := ansiColorStripper.ReplaceAllString(log[:20], "") + if logger.LevelTrace < l.logLevel && strings.HasPrefix(prefix, "[TRACE]") { + continue + } + if logger.LevelDebug < l.logLevel && strings.HasPrefix(prefix, "[DEBUG]") { + continue + } + if logger.LevelInfo < l.logLevel && strings.HasPrefix(prefix, "[INFO]") { + continue + } + if logger.LevelWarn < l.logLevel && strings.HasPrefix(prefix, "[WARN]") { + continue + } + if logger.LevelError < l.logLevel && strings.HasPrefix(prefix, "[ERROR]") { + continue + } + continue + } + l.ui.AddLog("%s", log) + } + return len(p), nil +} diff --git a/internal/dev/websocket.go b/internal/dev/websocket.go deleted file mode 100644 index 5d739211..00000000 --- a/internal/dev/websocket.go +++ /dev/null @@ -1,670 +0,0 @@ -package dev - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net" - "net/http" - "net/url" - "strings" - "time" - - "github.com/agentuity/cli/internal/errsystem" - "github.com/agentuity/cli/internal/project" - "github.com/agentuity/go-common/logger" - "github.com/agentuity/go-common/telemetry" - "github.com/cenkalti/backoff/v4" - "github.com/google/uuid" - "github.com/gorilla/websocket" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/trace" -) - -var propagator propagation.TraceContext - -type Websocket struct { - webSocketId string - conn *websocket.Conn - OtelToken string - OtelUrl string - Project project.ProjectContext - orgId string - done chan struct{} - apiKey string - websocketUrl string - maxRetries int - retryCount int - parentCtx context.Context - ctx context.Context - logger logger.Logger - cleanup func() - tracer trace.Tracer - version string - binaryProtocol bool -} - -type OutputPayload struct { - ContentType string `json:"contentType"` - Payload []byte `json:"payload"` - Trigger string `json:"trigger"` -} - -func isOutputPayload(message []byte) (*OutputPayload, error) { - var op OutputPayload - if err := json.Unmarshal(message, &op); err != nil { - return nil, err - } - return &op, nil -} - -func isContextCanceled(ctx context.Context, err error) bool { - if errors.Is(err, context.Canceled) { - return true - } - select { - case <-ctx.Done(): - return true - default: - return false - } -} - -func (c *Websocket) Done() <-chan struct{} { - return c.done -} - -const maxHealthCheckDuration = time.Second * 30 - -func isConnectionErrorRetryable(err error) bool { - if strings.Contains(err.Error(), "connection refused") { - return true - } - if strings.Contains(err.Error(), "connection reset by peer") { - return true - } - if strings.Contains(err.Error(), "No connection could be made because the target machine actively refused it") { // windows - return true - } - return false -} - -func (c *Websocket) getAgentProtocol(ctx context.Context, port int) (bool, error) { - url := fmt.Sprintf("http://127.0.0.1:%d/_health", port) - started := time.Now() - var i int - for time.Since(started) < maxHealthCheckDuration { - i++ - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return false, err - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - c.logger.Debug("healthcheck attempt #%d failed: %s", i, err) - if isConnectionErrorRetryable(err) { - time.Sleep(time.Millisecond * time.Duration(100*i+1)) - continue - } - return false, err - } - c.logger.Debug("healthcheck attempt #%d succeeded with status code %d", i, resp.StatusCode) - defer resp.Body.Close() - if resp.StatusCode == 200 { - return resp.Header.Get("x-agentuity-binary") == "true", nil - } - } - return false, fmt.Errorf("failed to inspect agents after %s", maxHealthCheckDuration) -} - -func (c *Websocket) getAgentWelcome(ctx context.Context, port int) (map[string]Welcome, error) { - url := fmt.Sprintf("http://127.0.0.1:%d/welcome", port) - for i := 0; i < 5; i++ { - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - if isConnectionErrorRetryable(err) { - time.Sleep(time.Millisecond * time.Duration(100*i+1)) - continue - } - return nil, err - } - defer resp.Body.Close() - if resp.StatusCode == 404 { - return nil, nil // this is ok, just means no agents have inspect - } - res := make(map[string]Welcome) - if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { - return nil, err - } - return res, nil - } - return nil, fmt.Errorf("failed to inspect agents after 5 attempts") -} - -func (c *Websocket) StartReadingMessages(ctx context.Context, logger logger.Logger, port int) { - var err error - c.binaryProtocol, err = c.getAgentProtocol(ctx, port) - if err != nil { - logger.Fatal("Your project failed to start. This typically happens when the project cannot compile or this is an underlying issue with starting the project.") - return - } - - go func() { - defer close(c.done) - for c.conn != nil { - _, m, err := c.conn.ReadMessage() - if err != nil { - if isContextCanceled(ctx, err) { - logger.Debug("shutdown in progress, exiting") - return - } - if errors.Is(err, websocket.ErrCloseSent) || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { - logger.Debug("connection closed") - if c.retryCount < c.maxRetries { - logger.Info("attempting to reconnect, retry %d of %d", c.retryCount+1, c.maxRetries) - if err := c.connect(logger, true); err != nil { - logger.Error("failed to reconnect: %s", err) - c.retryCount++ - continue - } - c.retryCount = 0 - continue - } - return - } - logger.Error("failed to read message: %s", err) - if c.retryCount < c.maxRetries { - logger.Info("attempting to reconnect, retry %d of %d", c.retryCount+1, c.maxRetries) - if err := c.connect(logger, true); err != nil { - logger.Error("failed to reconnect: %s", err) - c.retryCount++ - continue - } - c.retryCount = 0 - continue - } - return - } - // Reset retry count on successful message - c.retryCount = 0 - - logger.Trace("recv: %s", string(m)) - - var message Message - if err := json.Unmarshal(m, &message); err != nil { - logger.Error("failed to unmarshal agent message: %s", err) - return - } - - if message.Type == "input" { - var inputMsg InputMessage - if err := json.Unmarshal(m, &inputMsg); err != nil { - logger.Error("failed to unmarshal agent message: %s", err) - return - } - processInputMessage(logger, c, m, port) - } - if message.Type == "getAgents" { - agents := make([]Agent, 0) - for _, agent := range c.Project.Project.Agents { - agents = append(agents, Agent{ - Name: agent.Name, - ID: agent.ID, - Description: agent.Description, - }) - } - welcome, err := c.getAgentWelcome(ctx, port) - if err != nil { - logger.Error("failed to inspect agents: %s", err) - continue - } - for i, agent := range agents { - if val, ok := welcome[agent.ID]; ok { - agents[i].Welcome = &val - } - } - agentsMessage := NewAgentsMessage(c.webSocketId, c.Project.Project.ProjectId, AgentsPayload{ - ProjectID: c.Project.Project.ProjectId, - ProjectName: c.Project.Project.Name, - Agents: agents, - }) - - c.SendMessage(logger, agentsMessage) - } - } - }() -} - -func (c *Websocket) connect(logger logger.Logger, close bool) error { - if close { - // Close existing connection if it exists - if c.cleanup != nil { - c.cleanup() - } - if c.conn != nil { - c.conn.Close() - } - } - - u, err := url.Parse(c.websocketUrl) - if err != nil { - return fmt.Errorf("failed to parse url: %s", err) - } - u.Path = fmt.Sprintf("/websocket/devmode/%s", c.webSocketId) - u.RawQuery = url.Values{ - "from": []string{"cli"}, - "projectId": []string{c.Project.Project.ProjectId}, - }.Encode() - - if u.Scheme == "http" { - u.Scheme = "ws" - } else if u.Scheme == "https" { - u.Scheme = "wss" - } - - urlString := u.String() - headers := http.Header{} - headers.Set("Authorization", fmt.Sprintf("Bearer %s", c.apiKey)) - - // connect to the websocket - logger.Trace("connecting to %s", urlString) - var httpResponse *http.Response - c.conn, httpResponse, err = websocket.DefaultDialer.Dial(urlString, headers) - if err != nil { - if httpResponse != nil { - if httpResponse.StatusCode == 401 { - logger.Error("invalid api key") - } - } - return fmt.Errorf("failed to dial: %s", err) - } - - // get the otel token and url from the headers - c.OtelToken = httpResponse.Header.Get("X-AGENTUITY-OTLP-BEARER-TOKEN") - if c.OtelToken == "" { - return errsystem.New(errsystem.ErrAuthenticateOtelServer, nil, errsystem.WithUserMessage("Failed to authenticate with otel server")) - } - c.OtelUrl = httpResponse.Header.Get("X-AGENTUITY-OTLP-URL") - if c.OtelUrl == "" { - return errsystem.New(errsystem.ErrAuthenticateOtelServer, nil, errsystem.WithUserMessage("Failed to get otel server url")) - } - - c.ctx, c.logger, c.cleanup, err = telemetry.NewWithAPIKey(c.parentCtx, "@agentuity/cli", c.OtelUrl, c.OtelToken, logger) - if err != nil { - return fmt.Errorf("failed to create OTLP telemetry trace: %w", err) - } - - logger.Debug("successfully connected") - return nil -} - -type WebsocketArgs struct { - Ctx context.Context - Logger logger.Logger - WebsocketId string - WebsocketUrl string - APIKey string - Project project.ProjectContext - OrgId string - Version string -} - -func NewWebsocket(args WebsocketArgs) (*Websocket, error) { - tracer := otel.Tracer("@agentuity/cli", trace.WithInstrumentationVersion(args.Version)) - ws := Websocket{ - parentCtx: args.Ctx, - webSocketId: args.WebsocketId, - Project: args.Project, - done: make(chan struct{}), - apiKey: args.APIKey, - websocketUrl: args.WebsocketUrl, - maxRetries: 5, - retryCount: 0, - tracer: tracer, - orgId: args.OrgId, - version: args.Version, - } - u, err := url.Parse(args.WebsocketUrl) - if err != nil { - return nil, fmt.Errorf("failed to parse url: %s", err) - } - u.Path = fmt.Sprintf("/websocket/devmode/%s", args.WebsocketId) - u.RawQuery = url.Values{ - "from": []string{"cli"}, - }.Encode() - - if u.Scheme == "http" { - u.Scheme = "ws" - } else if u.Scheme == "https" { - u.Scheme = "wss" - } - - if err := ws.connect(args.Logger, false); err != nil { - return nil, err - } - - return &ws, nil -} - -// Update SendMessage to accept the MessageType interface -func (c *Websocket) SendMessage(logger logger.Logger, msg Message) error { - buf, err := json.Marshal(msg) - if err != nil { - return err - } - if c.conn == nil { - return nil - } - if err := c.conn.WriteMessage(websocket.TextMessage, buf); err != nil { - return err - } - return nil -} - -func (c *Websocket) Close() error { - c.SendMessage(c.logger, NewCloseMessage(uuid.New().String(), c.Project.Project.ProjectId)) - if c.conn != nil { - closeHandler := c.conn.CloseHandler() - if err := closeHandler(1000, "User requested shutdown"); err != nil { - c.logger.Error("failed to close connection: %s", err) - return err - } - } - if c.cleanup != nil { - c.cleanup() - c.cleanup = nil - } - return nil -} - -func (c *Websocket) WebURL(appUrl string) string { - return fmt.Sprintf("%s/devmode/%s", appUrl, c.webSocketId) -} - -type Message struct { - ID string `json:"id"` - Type string `json:"type"` - Payload map[string]any `json:"payload"` - ProjectId string `json:"projectId"` -} - -// messages send by server to CLI -type InputMessage struct { - ID string `json:"id"` - Type string `json:"type"` - From string `json:"from"` - Payload struct { - SessionID string `json:"sessionId"` - Trigger string `json:"trigger"` - AgentID string `json:"agentId"` - ContentType string `json:"contentType"` - Payload []byte `json:"payload"` - } `json:"payload"` -} - -// messages send by CLI to the server -func NewOutputMessage(id string, projectId string, payload struct { - ContentType string `json:"contentType"` - Payload []byte `json:"payload"` - Trigger string `json:"trigger"` -}) Message { - payloadMap := map[string]any{ - "contentType": payload.ContentType, - "payload": payload.Payload, - "trigger": payload.Trigger, - } - return Message{ - ID: id, - Type: "output", - Payload: payloadMap, - ProjectId: projectId, - } - -} - -func NewCloseMessage(id string, projectId string) Message { - payloadMap := map[string]any{} - return Message{ - ID: id, - Type: "close", - Payload: payloadMap, - ProjectId: projectId, - } -} - -type Welcome struct { - Message string `json:"welcome"` - Prompts []struct { - Data string `json:"data"` - ContentType string `json:"contentType"` - } `json:"prompts,omitempty"` -} - -type Agent struct { - Name string `json:"name"` - ID string `json:"id"` - Description string `json:"description"` - Welcome *Welcome `json:"welcome,omitempty"` -} - -type AgentsPayload struct { - Agents []Agent `json:"agents"` - ProjectID string `json:"projectId"` - ProjectName string `json:"projectName"` -} - -func NewAgentsMessage(id string, projectId string, payload AgentsPayload) Message { - payloadMap := map[string]any{ - "agents": payload.Agents, - "projectId": projectId, - "projectName": payload.ProjectName, - } - - return Message{ - ID: id, - Type: "agents", - ProjectId: projectId, - Payload: payloadMap, - } -} - -func processInputMessage(plogger logger.Logger, c *Websocket, m []byte, port int) { - started := time.Now() - ctx, logger, span := telemetry.StartSpan(c.ctx, plogger, c.tracer, "TriggerRun", - trace.WithAttributes( - attribute.String("@agentuity/devmode", "true"), - attribute.String("trigger", "manual"), - attribute.String("@agentuity/deploymentId", c.webSocketId), - ), - trace.WithSpanKind(trace.SpanKindConsumer), - ) - defer span.End() - - var inputMsg InputMessage - var outputMessage *Message - var err error - - defer func() { - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - msg := NewOutputMessage(inputMsg.ID, c.Project.Project.ProjectId, OutputPayload{ - ContentType: "text/plain", - Payload: []byte(err.Error()), - Trigger: "", - }) - outputMessage = &msg - } else { - span.SetStatus(codes.Ok, "") - } - if outputMessage != nil { - c.SendMessage(plogger, *outputMessage) - } - span.SetAttributes( - attribute.Int64("@agentuity/cpu_time", time.Since(started).Milliseconds()), - ) - plogger.Info("processed sess_%s in %s", span.SpanContext().TraceID(), time.Since(started)) - }() - - if lerr := json.Unmarshal(m, &inputMsg); lerr != nil { - logger.Error("failed to unmarshal agent message: %s", lerr) - err = lerr - return - } - - span.SetAttributes( - attribute.String("@agentuity/agentId", inputMsg.Payload.AgentID), - attribute.String("@agentuity/orgId", c.orgId), - attribute.String("@agentuity/projectId", c.Project.Project.ProjectId), - attribute.String("@agentuity/env", "development"), - ) - - spanContext := span.SpanContext() - traceState := spanContext.TraceState() - traceState, err = traceState.Insert("id", inputMsg.Payload.AgentID) - if err != nil { - logger.Error("failed to insert agent id into trace state: %s", err) - err = fmt.Errorf("failed to insert agent id into trace state: %w", err) - return - } - traceState, err = traceState.Insert("oid", c.orgId) - if err != nil { - logger.Error("failed to insert org id into trace state: %s", err) - err = fmt.Errorf("failed to insert org id into trace state: %w", err) - return - } - traceState, err = traceState.Insert("pid", c.Project.Project.ProjectId) - if err != nil { - logger.Error("failed to insert project id into trace state: %s", err) - err = fmt.Errorf("failed to insert project id into trace state: %w", err) - return - } - ctx = trace.ContextWithSpanContext(ctx, spanContext.WithTraceState(traceState)) - - c.Project.Logger.Debug("input message: %+v", inputMsg) - - if c.Project.Project.Development == nil { - logger.Error("development is not enabled for this project") - err = errors.New("development is not enabled for this project") - return - } - - var inputPayload []byte - url := fmt.Sprintf("http://127.0.0.1:%d/%s", port, inputMsg.Payload.AgentID) - contentType := "application/json" - - if !c.binaryProtocol { - // TODO: remove this once were all off the old protocol - // make a json object with the payload - payload := map[string]any{ - "contentType": inputMsg.Payload.ContentType, - "payload": inputMsg.Payload.Payload, - "trigger": "manual", - } - - var lerr error - inputPayload, lerr = json.Marshal(payload) - if lerr != nil { - logger.Error("failed to marshal payload: %s", lerr) - err = fmt.Errorf("failed to marshal payload: %w", lerr) - return - } - } else { - contentType = inputMsg.Payload.ContentType - inputPayload = inputMsg.Payload.Payload - } - logger.Debug("sending payload: %s to %s", string(inputPayload), url) - - req, lerr := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(inputPayload)) - if lerr != nil { - logger.Error("failed to create request: %s", lerr) - err = fmt.Errorf("failed to create HTTP request: %w", lerr) - return - } - req.Header.Set("Content-Type", contentType) - req.Header.Set("User-Agent", "Agentuity CLI/"+c.version) - propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)) - - logger.Debug("sending request to %s with trace id: %s", url, spanContext.TraceID()) - - expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = 500 * time.Millisecond - expBackoff.MaxInterval = 5 * time.Second - expBackoff.MaxElapsedTime = 30 * time.Second // Max total time as requested - expBackoff.Multiplier = 2.0 - expBackoff.RandomizationFactor = 0.3 // Add jitter - - var resp *http.Response - operation := func() error { - var err error - resp, err = http.DefaultClient.Do(req) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { - logger.Warn("connection timeout to agent, retrying...") - return err - } - if strings.Contains(err.Error(), "connection refused") { - logger.Warn("connection refused to agent, retrying...") - return err - } - logger.Error("failed to post to agent: %s", err) - return backoff.Permanent(err) - } - return nil - } - - err = backoff.Retry(operation, expBackoff) - if err != nil { - logger.Error("all attempts to post to agent failed: %s", err) - err = fmt.Errorf("failed to post to agent: %w", err) - return - } - defer resp.Body.Close() - - body, lerr := io.ReadAll(resp.Body) - if lerr != nil { - logger.Error("failed to read response body: %s", lerr) - err = fmt.Errorf("failed to read response body: %w", lerr) - return - } - - if resp.StatusCode != 200 { - err = fmt.Errorf("the Agent produced an error: %s", string(body)) - return - } - - logger.Debug("response: %s (status code: %d)", string(body), resp.StatusCode) - - var trigger string - if c.binaryProtocol { - trigger = resp.Header.Get("x-agentuity-trigger") - contentType = resp.Header.Get("content-type") - } else { - // TODO: remove this once were all off the old protocol - output, lerr := isOutputPayload(body) - if lerr != nil { - err = fmt.Errorf("the Agent produced an error") - return - } - trigger = output.Trigger - contentType = output.ContentType - body = output.Payload - } - - msg := NewOutputMessage(inputMsg.ID, c.Project.Project.ProjectId, OutputPayload{ - ContentType: contentType, - Payload: body, - Trigger: trigger, - }) - outputMessage = &msg -} From 9512574e93691fca2457ab4accf328d8e42178b1 Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Sun, 4 May 2025 22:28:15 -0500 Subject: [PATCH 2/8] more fixes --- cmd/bundle.go | 1 + cmd/dev.go | 36 ++++++++++++++++++------------------ go.mod | 2 +- go.sum | 6 ++++-- internal/bundler/bundler.go | 6 ++++-- internal/dev/server.go | 30 ++++++++++++++++++++++++++++-- internal/dev/tui.go | 24 +++++++++++++++--------- internal/dev/tui_logger.go | 1 - 8 files changed, 71 insertions(+), 35 deletions(-) diff --git a/cmd/bundle.go b/cmd/bundle.go index e9b54e98..a1190fb3 100644 --- a/cmd/bundle.go +++ b/cmd/bundle.go @@ -50,6 +50,7 @@ Examples: Production: production, Install: install, CI: ci, + Writer: os.Stderr, }); err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to bundle project")).ShowErrorAndExit() } diff --git a/cmd/dev.go b/cmd/dev.go index 2a6f7f91..9b303caa 100644 --- a/cmd/dev.go +++ b/cmd/dev.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "io" "os" "os/signal" "path/filepath" @@ -21,7 +22,6 @@ import ( "github.com/bep/debounce" "github.com/spf13/cobra" "github.com/spf13/viper" - "golang.org/x/term" ) var devCmd = &cobra.Command{ @@ -42,13 +42,6 @@ Examples: agentuity dev agentuity dev --dir /path/to/project`, Run: func(cmd *cobra.Command, args []string) { - fd := int(os.Stdin.Fd()) - oldState, err := term.GetState(fd) - if err != nil { - panic(err) - } - defer term.Restore(fd, oldState) - log := env.NewLogger(cmd) logLevel := env.LogLevel(cmd) apiUrl, appUrl, transportUrl := util.GetURLs(log) @@ -201,13 +194,14 @@ Examples: ui.Start() - defer ui.Close() + defer ui.Close(false) tuiLogger := dev.NewTUILogger(logLevel, ui) if err := server.Connect(ui, tuiLogger); err != nil { - ui.Close() - tuiLogger.Fatal("failed to start live dev connection: %s", err) + tuiLogger.Error("failed to start live dev connection: %s", err) + ui.Close(true) + return } projectServerCmd, err := dev.CreateRunProjectCmd(processCtx, tuiLogger, theproject, server, dir, orgId, port, tuiLogger) @@ -215,19 +209,20 @@ Examples: errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to run project")).ShowErrorAndExit() } - build := func(initial bool) { + build := func(initial bool) bool { started := time.Now() var ok bool ui.ShowSpinner("Building project ...", func() { + var w io.Writer = tuiLogger if err := bundler.Bundle(bundler.BundleContext{ Context: ctx, Logger: tuiLogger, ProjectDir: dir, Production: false, - DevMode: !initial, + DevMode: true, + Writer: w, }); err != nil { if err == bundler.ErrBuildFailed { - tuiLogger.Error("build failed ...") return } errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to bundle project: %s", err))).ShowErrorAndExit() @@ -237,10 +232,14 @@ Examples: if ok && !initial { ui.SetStatusMessage("✨ Built in %s", time.Since(started).Round(time.Millisecond)) } + return ok } - // Initial build - build(true) + // Initial build must exit if it fails + if !build(true) { + ui.Close(true) + return + } restart := func() { isDeliberateRestart = true @@ -273,8 +272,9 @@ Examples: pid = projectServerCmd.Process.Pid if err := server.HealthCheck(devModeUrl); err != nil { - ui.Close() - tuiLogger.Fatal("failed to health check connection: %s", err) + tuiLogger.Error("failed to health check connection: %s", err) + ui.Close(true) + return } ui.SetStatusMessage("🚀 DevMode ready") diff --git a/go.mod b/go.mod index a7762fa0..25d704ba 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.2 require ( github.com/Masterminds/semver v1.5.0 - github.com/agentuity/go-common v1.0.45 + github.com/agentuity/go-common v1.0.46 github.com/agentuity/mcp-golang/v2 v2.0.2 github.com/bep/debounce v1.2.1 github.com/bmatcuk/doublestar/v4 v4.8.1 diff --git a/go.sum b/go.sum index 2f543628..c510c79a 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.1.5 h1:eoAQfK2dwL+tFSFpr7TbOaPNUbPiJj4fLYwwGE1FQO4= github.com/ProtonMail/go-crypto v1.1.5/go.mod h1:rA3QumHc/FZ8pAHreoekgiAbzpNsfQAosU5td4SnOrE= -github.com/agentuity/go-common v1.0.45 h1:kjpiIIG3Fmbjne8wtd4lCFikAdbfGal5rqXAeTTPD6I= -github.com/agentuity/go-common v1.0.45/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= +github.com/agentuity/go-common v1.0.46 h1:wum2aGq6hPr1TCOGOI77mlkwx4tv1f2AmdSEm8xrsO0= +github.com/agentuity/go-common v1.0.46/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= github.com/agentuity/mcp-golang/v2 v2.0.2 h1:wZqS/aHWZsQoU/nd1E1/iMsVY2dywWT9+PFlf+3YJxo= github.com/agentuity/mcp-golang/v2 v2.0.2/go.mod h1:U105tZXyTatxxOBlcObRgLb/ULvGgT2DJ1nq/8++P6Q= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= @@ -150,6 +150,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= diff --git a/internal/bundler/bundler.go b/internal/bundler/bundler.go index 045b557b..5a0e810b 100644 --- a/internal/bundler/bundler.go +++ b/internal/bundler/bundler.go @@ -3,6 +3,7 @@ package bundler import ( "context" "fmt" + "io" "os" "os/exec" "path/filepath" @@ -38,6 +39,7 @@ type BundleContext struct { Install bool CI bool DevMode bool + Writer io.Writer } func bundleJavascript(ctx BundleContext, dir string, outdir string, theproject *project.Project) error { @@ -156,11 +158,11 @@ func bundleJavascript(ctx BundleContext, dir string, outdir string, theproject * }) ctx.Logger.Debug("finished build in %v", time.Since(started)) if len(result.Errors) > 0 { - fmt.Println("\n" + tui.Warning("Build Failed") + "\n") + fmt.Fprintln(ctx.Writer, "\n"+tui.Warning("Build Failed")+"\n") for _, err := range result.Errors { formattedError := formatESBuildError(dir, err) - fmt.Println(formattedError) + fmt.Fprintln(ctx.Writer, formattedError) } if ctx.DevMode { diff --git a/internal/dev/server.go b/internal/dev/server.go index 640ea52b..7ec6987b 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -2,6 +2,7 @@ package dev import ( "context" + "encoding/json" "errors" "fmt" "math" @@ -88,7 +89,10 @@ func (s *Server) Close() error { } } s.bridge.Close() - s.cleanup() + if s.cleanup != nil { + s.cleanup() + s.cleanup = nil + } }) return nil } @@ -202,10 +206,32 @@ func (s *Server) OnError(client *bridge.Client, err error) { s.logger.Error("an error occurred: %s", err) } +type ControlMessage struct { + Action string `json:"action"` +} + +type AgentsControlResponse struct { + ProjectID string `json:"projectId"` + ProjectName string `json:"projectName"` + Agents []project.AgentConfig `json:"agents"` +} + // OnControl is called when a control event is received from the bridge. you can respond with a control event to the bridge by returning a non-nil value. func (s *Server) OnControl(client *bridge.Client, id string, data []byte) ([]byte, error) { s.logger.Debug("on control: id: %s, data: %s", id, string(data)) - return nil, nil + var message ControlMessage + if err := json.Unmarshal(data, &message); err != nil { + return nil, fmt.Errorf("failed to unmarshal control message: %w", err) + } + switch message.Action { + case "getAgents": + return json.Marshal(AgentsControlResponse{ + ProjectID: s.Project.Project.ProjectId, + ProjectName: s.Project.Project.Name, + Agents: s.Project.Project.Agents, + }) + } + return nil, fmt.Errorf("unknown control action: %s", message.Action) } // OnRefresh is called when the bridge client has refreshed its connection diff --git a/internal/dev/tui.go b/internal/dev/tui.go index e0cabec0..e29e670b 100644 --- a/internal/dev/tui.go +++ b/internal/dev/tui.go @@ -363,10 +363,11 @@ func (m *model) View() string { } type Agent struct { - ID string - Name string - LocalURL string - PublicURL string + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + LocalURL string `json:"local_url,omitempty"` + PublicURL string `json:"public_url,omitempty"` } type DevModeUI struct { @@ -379,6 +380,7 @@ type DevModeUI struct { spinnerCtx context.Context spinnerCancel context.CancelFunc + aborting bool } type DevModeConfig struct { @@ -403,11 +405,10 @@ func (d *DevModeUI) Done() <-chan struct{} { } // Close the program which will stop the program and wait for it to exit -func (d *DevModeUI) Close() { +func (d *DevModeUI) Close(abort bool) { d.once.Do(func() { - d.program.Send(tea.Quit) - d.cancel() - d.wg.Wait() + d.aborting = abort + d.program.Quit() }) } @@ -423,10 +424,15 @@ func (d *DevModeUI) Start() { defer func() { d.cancel() d.wg.Done() + if d.aborting { + for i := len(d.model.logItems) - 1; i >= 0; i-- { + fmt.Println(d.model.logItems[i]) + } + } }() _, err := d.program.Run() if err != nil { - fmt.Printf("Error running program: %v", err) + fmt.Printf("Error running program: %v\n", err) } }() } diff --git a/internal/dev/tui_logger.go b/internal/dev/tui_logger.go index 7cdcfd99..f0f52eeb 100644 --- a/internal/dev/tui_logger.go +++ b/internal/dev/tui_logger.go @@ -124,7 +124,6 @@ func (l *TuiLogger) Write(p []byte) (n int, err error) { if logger.LevelError < l.logLevel && strings.HasPrefix(prefix, "[ERROR]") { continue } - continue } l.ui.AddLog("%s", log) } From b350793afa3c8df5d9b9720bbdc94d79e81058cc Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Wed, 14 May 2025 21:00:33 -0500 Subject: [PATCH 3/8] refactor to use echo --- cmd/dev.go | 98 ++------- go.mod | 3 +- go.sum | 6 +- internal/dev/server.go | 436 ++++++++++++++++++++++------------------- internal/dev/tui.go | 15 +- 5 files changed, 272 insertions(+), 286 deletions(-) diff --git a/cmd/dev.go b/cmd/dev.go index 9b303caa..02777d94 100644 --- a/cmd/dev.go +++ b/cmd/dev.go @@ -6,7 +6,6 @@ import ( "io" "os" "os/signal" - "path/filepath" "runtime" "syscall" "time" @@ -16,12 +15,10 @@ import ( "github.com/agentuity/cli/internal/errsystem" "github.com/agentuity/cli/internal/project" "github.com/agentuity/cli/internal/util" - "github.com/agentuity/go-common/bridge" "github.com/agentuity/go-common/env" "github.com/agentuity/go-common/tui" "github.com/bep/debounce" "github.com/spf13/cobra" - "github.com/spf13/viper" ) var devCmd = &cobra.Command{ @@ -74,27 +71,6 @@ Examples: errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithUserMessage("Failed to validate project (%s) using the provided API key from the .env file in %s. This is most likely due to the API key being invalid or the project has been deleted.\n\nYou can import this project using the following command:\n\n"+tui.Command("project import"), theproject.Project.ProjectId, dir), errsystem.WithContextMessage(fmt.Sprintf("Failed to get project: %s", err))).ShowErrorAndExit() } - projectToken := os.Getenv("AGENTUITY_API_KEY") - if projectToken == "" { - envFile := filepath.Join(dir, ".env") - if util.Exists(envFile) { - envs, err := env.ParseEnvFile(envFile) - if err != nil { - log.Fatal("failed to parse .env file: %s", err) - } - for _, kv := range envs { - if kv.Key == "AGENTUITY_API_KEY" { - projectToken = kv.Val - break - } - } - } - } - - if projectToken == "" { - log.Fatal("failed to find AGENTUITY_API_KEY in .env file or system environment variable") - } - orgId := project.OrgId port, _ := cmd.Flags().GetInt("port") @@ -105,48 +81,7 @@ Examples: } } - var connection *bridge.BridgeConnectionInfo - - settings := viper.Get("devmode." + orgId) - if val, ok := settings.(map[string]any); ok { - connection = &bridge.BridgeConnectionInfo{} - for k, v := range val { - switch k { - case "expires_at": - if val, ok := v.(string); ok { - expiresAt, err := time.Parse(time.RFC3339, val) - if err != nil { - log.Fatal("failed to parse expires_at: %s", err) - } - connection.ExpiresAt = &expiresAt - } - case "websocket_url": - if val, ok := v.(string); ok { - connection.WebsocketURL = val - } - case "stream_url": - if val, ok := v.(string); ok { - connection.StreamURL = val - } - case "client_url": - if val, ok := v.(string); ok { - connection.ClientURL = val - } - case "replies_url": - if val, ok := v.(string); ok { - connection.RepliesURL = val - } - case "refresh_url": - if val, ok := v.(string); ok { - connection.RefreshURL = val - } - case "control_url": - if val, ok := v.(string); ok { - connection.ControlURL = val - } - } - } - } + serverAddr, _ := cmd.Flags().GetString("server") server, err := dev.New(dev.ServerArgs{ Ctx: ctx, @@ -155,13 +90,12 @@ Examples: APIURL: apiUrl, TransportURL: transportUrl, APIKey: apiKey, - ProjectToken: projectToken, + OrgId: orgId, Project: theproject, Version: Version, - OrgId: orgId, UserId: userId, - Connection: connection, Port: port, + ServerAddr: serverAddr, }) if err != nil { log.Fatal("failed to create live dev connection: %s", err) @@ -172,22 +106,19 @@ Examples: var pid int consoleUrl := server.WebURL(appUrl) - publicUrl := server.PublicURL(appUrl) devModeUrl := fmt.Sprintf("http://127.0.0.1:%d", port) - agents := make([]dev.Agent, 0) + agents := make([]*dev.Agent, 0) for _, agent := range theproject.Project.Agents { - agents = append(agents, dev.Agent{ - ID: agent.ID, - Name: agent.Name, - LocalURL: fmt.Sprintf("%s/%s", devModeUrl, agent.ID), - PublicURL: fmt.Sprintf("%s/%s", publicUrl, agent.ID), + agents = append(agents, &dev.Agent{ + ID: agent.ID, + Name: agent.Name, + LocalURL: fmt.Sprintf("%s/%s", devModeUrl, agent.ID), }) } ui := dev.NewDevModeUI(ctx, dev.DevModeConfig{ DevModeUrl: devModeUrl, - PublicUrl: publicUrl, AppUrl: consoleUrl, Agents: agents, }) @@ -199,11 +130,18 @@ Examples: tuiLogger := dev.NewTUILogger(logLevel, ui) if err := server.Connect(ui, tuiLogger); err != nil { - tuiLogger.Error("failed to start live dev connection: %s", err) + log.Error("failed to start live dev connection: %s", err) ui.Close(true) return } + publicUrl := server.PublicURL(appUrl) + ui.SetPublicURL(publicUrl) + + for _, agent := range agents { + agent.PublicURL = fmt.Sprintf("%s/%s", publicUrl, agent.ID) + } + projectServerCmd, err := dev.CreateRunProjectCmd(processCtx, tuiLogger, theproject, server, dir, orgId, port, tuiLogger) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to run project")).ShowErrorAndExit() @@ -333,7 +271,7 @@ Examples: func init() { rootCmd.AddCommand(devCmd) devCmd.Flags().StringP("dir", "d", ".", "The directory to run the development server in") - devCmd.Flags().String("org-id", "", "The organization to run the project") devCmd.Flags().Int("port", 0, "The port to run the development server on (uses project default if not provided)") - devCmd.Flags().MarkHidden("org-id") + devCmd.Flags().String("server", "echo.agentuity.cloud:12001", "the echo server to connect to") + devCmd.Flags().MarkHidden("server") } diff --git a/go.mod b/go.mod index 25d704ba..5536fe87 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.2 require ( github.com/Masterminds/semver v1.5.0 - github.com/agentuity/go-common v1.0.46 + github.com/agentuity/go-common v1.0.55 github.com/agentuity/mcp-golang/v2 v2.0.2 github.com/bep/debounce v1.2.1 github.com/bmatcuk/doublestar/v4 v4.8.1 @@ -22,6 +22,7 @@ require ( github.com/spf13/cobra v1.9.1 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 + github.com/xtaci/smux v1.5.34 github.com/zijiren233/yaml-comment v0.2.2 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/term v0.30.0 diff --git a/go.sum b/go.sum index c510c79a..e0ded5f9 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.1.5 h1:eoAQfK2dwL+tFSFpr7TbOaPNUbPiJj4fLYwwGE1FQO4= github.com/ProtonMail/go-crypto v1.1.5/go.mod h1:rA3QumHc/FZ8pAHreoekgiAbzpNsfQAosU5td4SnOrE= -github.com/agentuity/go-common v1.0.46 h1:wum2aGq6hPr1TCOGOI77mlkwx4tv1f2AmdSEm8xrsO0= -github.com/agentuity/go-common v1.0.46/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= +github.com/agentuity/go-common v1.0.55 h1:ZaNh200F9TW0apb9UIhvfNDk76wAFAw5ufIU9VW3weU= +github.com/agentuity/go-common v1.0.55/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= github.com/agentuity/mcp-golang/v2 v2.0.2 h1:wZqS/aHWZsQoU/nd1E1/iMsVY2dywWT9+PFlf+3YJxo= github.com/agentuity/mcp-golang/v2 v2.0.2/go.mod h1:U105tZXyTatxxOBlcObRgLb/ULvGgT2DJ1nq/8++P6Q= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= @@ -266,6 +266,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +github.com/xtaci/smux v1.5.34 h1:OUA9JaDFHJDT8ZT3ebwLWPAgEfE6sWo2LaTy3anXqwg= +github.com/xtaci/smux v1.5.34/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/zijiren233/yaml-comment v0.2.2 h1:5ghs8huXFVb/kWCi66P+xbXq0GnOE2XVCnhaWd7mTs8= diff --git a/internal/dev/server.go b/internal/dev/server.go index 7ec6987b..05fc008e 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -1,52 +1,60 @@ package dev import ( + "bufio" + "bytes" "context" + "crypto/tls" "encoding/json" "errors" "fmt" + "io" "math" + "net" "net/http" + "strings" "sync" "time" "github.com/agentuity/cli/internal/project" "github.com/agentuity/cli/internal/util" - "github.com/agentuity/go-common/bridge" "github.com/agentuity/go-common/logger" cstr "github.com/agentuity/go-common/string" - "github.com/agentuity/go-common/telemetry" - "github.com/spf13/viper" + "github.com/xtaci/smux" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) type Server struct { - ID string - otelToken string - otelUrl string - Project project.ProjectContext - orgId string - userId string - apiurl string - transportUrl string - apiKey string - ctx context.Context - logger logger.Logger - tracer trace.Tracer - version string - bridge *bridge.Client - once sync.Once - apiclient *util.APIClient - registered bool - publicUrl string - port int - connected chan struct{} - pendingLogger *PendingLogger - pending map[string]*AgentRequest - pendingMu sync.RWMutex - cleanup func() + ID string + otelToken string + otelUrl string + Project project.ProjectContext + orgId string + userId string + apiurl string + transportUrl string + apiKey string + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + tracer trace.Tracer + version string + once sync.Once + apiclient *util.APIClient + publicUrl string + port int + connected chan string + pendingLogger logger.Logger + pending map[string]*AgentRequest + pendingMu sync.RWMutex + expiresAt *time.Time + tlsCertificate *tls.Certificate + conn *tls.Conn + session *smux.Session + wg sync.WaitGroup + serverAddr string } type ServerArgs struct { @@ -56,217 +64,230 @@ type ServerArgs struct { APIURL string TransportURL string APIKey string - ProjectToken string Project project.ProjectContext OrgId string UserId string Version string - Connection *bridge.BridgeConnectionInfo Port int + ServerAddr string } -var _ bridge.Handler = (*Server)(nil) - -func (c *Server) WebURL(appUrl string) string { - return fmt.Sprintf("%s/devmode/%s", appUrl, c.ID) -} - -func (c *Server) PublicURL(appUrl string) string { - return c.publicUrl -} - -func (s *Server) AgentURL(agentId string) string { - return fmt.Sprintf("http://127.0.0.1:%d/%s", s.port, agentId) +type ConnectionResponse struct { + Success bool `json:"success"` + Error string `json:"message"` + Data struct { + Certificate string `json:"certificate"` + PrivateKey string `json:"private_key"` + Domain string `json:"domain"` + ExpiresAt string `json:"expires_at"` + OtelUrl string `json:"otlp_url"` + OtelBearerToken string `json:"otlp_token"` + } `json:"data"` } // Close closes the bridge client and cleans up the connection func (s *Server) Close() error { - s.logger.Debug("closing bridge client") + s.logger.Debug("closing connection") s.once.Do(func() { - if s.registered { - if err := s.apiclient.Do("DELETE", "/cli/devmode/"+s.ID, map[string]string{"orgId": s.orgId}, nil); err != nil { - s.logger.Error("failed to unregister devmode connection: %s", err) - } + s.cancel() + if s.session != nil { + s.session.Close() + s.session = nil } - s.bridge.Close() - if s.cleanup != nil { - s.cleanup() - s.cleanup = nil + if s.conn != nil { + s.conn.Close() + s.conn = nil } + s.wg.Wait() }) return nil } -type ConnectionResponse struct { - Success bool `json:"success"` - Error string `json:"message"` - Data struct { - OtelUrl string `json:"otlpUrl"` - OtelBearerToken string `json:"otlpBearerToken"` - } `json:"data"` +func (s *Server) refreshConnection() error { + var resp ConnectionResponse + if err := s.apiclient.Do("GET", "/cli/devmode/"+s.Project.Project.ProjectId, nil, &resp); err != nil { + return fmt.Errorf("failed to refresh connection: %w", err) + } + s.otelUrl = resp.Data.OtelUrl + s.otelToken = resp.Data.OtelBearerToken + tv, err := time.Parse(time.RFC3339, resp.Data.ExpiresAt) + if err != nil { + return fmt.Errorf("failed to parse expires at: %w", err) + } + s.expiresAt = &tv + s.publicUrl = fmt.Sprintf("https://%s", resp.Data.Domain) + cert, err := tls.X509KeyPair([]byte(resp.Data.Certificate), []byte(resp.Data.PrivateKey)) + if err != nil { + return fmt.Errorf("failed to create tls key pair: %w", err) + } + s.tlsCertificate = &cert + return nil } -// OnConnect is called when the bridge client is connected to the bridge server -func (s *Server) OnConnect(client *bridge.Client) error { - s.logger.Debug("on connect") +func (s *Server) reconnect() { + if s.session != nil { + s.session.Close() + s.session = nil + } + if s.conn != nil { + s.conn.Close() + s.conn = nil + } + go s.connect(false) +} +func (s *Server) connect(initial bool) { + s.wg.Add(1) + var gerr error defer func() { - s.connected <- struct{}{} // signal that the connection is established (even if there was an error) + if initial && gerr != nil { + s.connected <- gerr.Error() + } + s.wg.Done() }() - payload := map[string]string{ - "orgId": s.orgId, - "publicURL": client.ClientURL(), - "websocketURL": client.WebsocketURL(), + if err := s.refreshConnection(); err != nil { + s.logger.Error("failed to refresh connection: %s", err) + gerr = err + return } - var response ConnectionResponse + var tlsConfig tls.Config + tlsConfig.Certificates = []tls.Certificate{*s.tlsCertificate} - if err := s.apiclient.Do("PUT", "/cli/devmode/"+s.ID, payload, &response); err != nil { - return fmt.Errorf("failed to register devmode connection with api server: %w", err) + if strings.Contains(s.serverAddr, "localhost") || strings.Contains(s.serverAddr, "127.0.0.1") { + tlsConfig.InsecureSkipVerify = true } - s.otelUrl = response.Data.OtelUrl - s.otelToken = response.Data.OtelBearerToken - - tctx, _, cleanup, err := telemetry.NewWithAPIKey(s.ctx, "@agentuity/cli", s.otelUrl, s.otelToken, s.logger) + conn, err := tls.Dial("tcp", s.serverAddr, &tlsConfig) if err != nil { - return fmt.Errorf("failed to create telemetry client: %w", err) + gerr = err + s.logger.Error("failed to dial tls: %s", err) + return } + s.conn = conn - s.ctx = tctx - s.cleanup = cleanup - - s.saveConnection(client) - - return nil -} - -// OnDisconnect is called when the bridge client is disconnected from the bridge server -func (s *Server) OnDisconnect(client *bridge.Client) { - s.logger.Debug("on disconnect") -} - -// OnHeader is called when a header is received from the bridge. this will only be called once before any data is sent. -func (s *Server) OnHeader(client *bridge.Client, id string, agentId string, headers map[string]string) { - s.logger.Debug("on header, id: %s, agent: %s, headers: %s", id, agentId, headers) - - req := AgentRequestArgs{ - Context: s.ctx, - Logger: s.logger, - Tracer: s.tracer, - ID: s.ID, - Version: s.version, - URL: s.AgentURL(agentId), - Headers: headers, - AgentID: agentId, - OrgID: s.orgId, - ProjectID: s.Project.Project.ProjectId, - } - agentReq, err := NewAgentRequest(req) + sess, err := smux.Client(conn, nil) if err != nil { - s.logger.Error("failed to create request: %s", err) + gerr = err + s.logger.Error("failed to start smux session: %s", err) return } - s.pendingMu.Lock() - s.pending[id] = agentReq - s.pendingMu.Unlock() - - go agentReq.Run() // run this in a new goroutine so as not to block the main bridge thread - s.logger.Debug("header exiting: %s, agent: %s", id, agentId) -} + s.session = sess -// OnData is called when a data is received from the bridge. this will be called multiple times if the data is large. -func (s *Server) OnData(client *bridge.Client, id string, agentId string, data []byte) { - s.logger.Debug("on data: id: %s, agent: %s", id, agentId) - s.pendingMu.RLock() - defer s.pendingMu.RUnlock() - if req, ok := s.pending[id]; ok { - req.send(data) - } else { - s.logger.Error("no pending request for id: %s and agent: %s", id, agentId) + if initial { + s.connected <- "" } -} -// OnClose is called when the bridge request is completed and no more data will be sent -func (s *Server) OnClose(client *bridge.Client, id string, agentId string) { - s.logger.Debug("on close: id: %s, param: %s", id, agentId) - s.pendingMu.Lock() - defer s.pendingMu.Unlock() - if req, ok := s.pending[id]; ok { - delete(s.pending, id) - req.close(client, id) - } else { - s.logger.Error("no pending request for id: %s and agent: %s", id, agentId) + for { + stream, err := sess.AcceptStream() + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, io.ErrClosedPipe) { + break + } + s.logger.Error("Stream accept failed: %s", err) + break + } + go s.handleStream(s.ctx, s.logger, stream, s.tlsCertificate.Leaf.Subject.CommonName) } } -// OnError is called when an error occurs at any point in the bridge client -func (s *Server) OnError(client *bridge.Client, err error) { - s.logger.Error("an error occurred: %s", err) -} - -type ControlMessage struct { - Action string `json:"action"` -} - type AgentsControlResponse struct { ProjectID string `json:"projectId"` ProjectName string `json:"projectName"` Agents []project.AgentConfig `json:"agents"` } -// OnControl is called when a control event is received from the bridge. you can respond with a control event to the bridge by returning a non-nil value. -func (s *Server) OnControl(client *bridge.Client, id string, data []byte) ([]byte, error) { - s.logger.Debug("on control: id: %s, data: %s", id, string(data)) - var message ControlMessage - if err := json.Unmarshal(data, &message); err != nil { - return nil, fmt.Errorf("failed to unmarshal control message: %w", err) +func (s *Server) handleStream(ctx context.Context, logger logger.Logger, stream net.Conn, hostname string) { + s.wg.Add(1) + defer func() { + stream.Close() + s.wg.Done() + }() + + // Read request from stream + req, err := http.ReadRequest(bufio.NewReader(stream)) + if err != nil { + logger.Error("Failed to parse HTTP request: %v", err) + return } - switch message.Action { - case "getAgents": - return json.Marshal(AgentsControlResponse{ + + switch req.URL.Path { + case "/_health": + resp := &http.Response{ + StatusCode: http.StatusOK, + } + select { + case <-s.ctx.Done(): + resp.StatusCode = http.StatusServiceUnavailable + default: + } + resp.Write(stream) + return + case "/_agents": + resp := &http.Response{ + StatusCode: http.StatusOK, + } + buf, err := json.Marshal(AgentsControlResponse{ ProjectID: s.Project.Project.ProjectId, ProjectName: s.Project.Project.Name, Agents: s.Project.Project.Agents, }) + if err != nil { + logger.Error("Failed to marshal agents control response: %v", err) + resp.StatusCode = http.StatusInternalServerError + resp.Body = io.NopCloser(strings.NewReader(err.Error())) + resp.Write(stream) + return + } + resp.Body = io.NopCloser(bytes.NewReader(buf)) + resp.Write(stream) + return } - return nil, fmt.Errorf("unknown control action: %s", message.Action) -} -// OnRefresh is called when the bridge client has refreshed its connection -func (s *Server) OnRefresh(client *bridge.Client) { - s.saveConnection(client) -} + req = req.WithContext(ctx) -func (s *Server) saveConnection(client *bridge.Client) { - s.registered = true - kv := map[string]string{} - conn := client.ConnectionInfo() - s.publicUrl = conn.ClientURL - if conn.ExpiresAt != nil { - kv["expires_at"] = conn.ExpiresAt.Format(time.RFC3339) - } - if conn.WebsocketURL != "" { - kv["websocket_url"] = conn.WebsocketURL - } - if conn.StreamURL != "" { - kv["stream_url"] = conn.StreamURL - } - if conn.ClientURL != "" { - kv["client_url"] = conn.ClientURL - } - if conn.RepliesURL != "" { - kv["replies_url"] = conn.RepliesURL - } - if conn.RefreshURL != "" { - kv["refresh_url"] = conn.RefreshURL + // Forward to local server + req.RequestURI = "" + req.URL.Scheme = "http" + req.URL.Host = fmt.Sprintf("127.0.0.1:%d", s.port) + req.Header.Set("Host", hostname) + + s.logger.Debug("forwarding request to local server: %s", req.URL.String()) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + logger.Error("Failed to contact local target: %v", err) + resp = &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(strings.NewReader("Local target error")), + } + resp.Write(stream) + return } - if conn.ControlURL != "" { - kv["control_url"] = conn.ControlURL + defer resp.Body.Close() + + s.logger.Debug("received response from local server: %s, status code: %d", req.URL.String(), resp.StatusCode) + + // TODO: fix streaming + + // Send response back + err = resp.Write(stream) + if err != nil { + logger.Error("Failed to write response to stream: %v", err) } - viper.Set("devmode."+s.orgId, kv) - viper.WriteConfig() +} + +func (s *Server) WebURL(appUrl string) string { + return fmt.Sprintf("%s/devmode/%s", appUrl, s.ID) +} + +func (s *Server) PublicURL(appUrl string) string { + return s.publicUrl +} + +func (s *Server) AgentURL(agentId string) string { + return fmt.Sprintf("http://127.0.0.1:%d/%s", s.port, agentId) } func (s *Server) HealthCheck(devModeUrl string) error { @@ -290,7 +311,7 @@ func (s *Server) HealthCheck(devModeUrl string) error { continue } s.logger.Trace("health check request returned status code: %d", resp.StatusCode) - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { s.logger.Trace("health check returned status code: %d", resp.StatusCode) dur := time.Millisecond * 150 * time.Duration(math.Pow(float64(i), 2)) time.Sleep(dur) @@ -303,13 +324,34 @@ func (s *Server) HealthCheck(devModeUrl string) error { func (s *Server) Connect(ui *DevModeUI, tuiLogger logger.Logger) error { s.logger = tuiLogger - s.pendingLogger.drain(ui, s.logger) - s.pendingLogger = nil - <-s.connected + if pl, ok := s.logger.(*PendingLogger); ok { + pl.drain(ui, s.logger) + } + s.pendingLogger = s.logger + msg := <-s.connected close(s.connected) + if msg != "" { + return fmt.Errorf("%s", msg) + } return nil } +func (s *Server) monitor() { + t := time.NewTicker(time.Minute * 10) + defer t.Stop() + for { + select { + case <-s.ctx.Done(): + return + case <-t.C: + if s.expiresAt != nil && time.Now().After(*s.expiresAt) { + s.logger.Debug("connection expired, reconnecting") + s.reconnect() + } + } + } +} + func New(args ServerArgs) (*Server, error) { id := cstr.NewHash(args.OrgId, args.UserId) tracer := otel.Tracer("@agentuity/cli", trace.WithInstrumentationAttributes( @@ -323,10 +365,13 @@ func New(args ServerArgs) (*Server, error) { pendingLogger := NewPendingLogger(args.LogLevel) + ctx, cancel := context.WithCancel(args.Ctx) + server := &Server{ ID: id, logger: pendingLogger, - ctx: args.Ctx, + ctx: ctx, + cancel: cancel, apiurl: args.APIURL, transportUrl: args.TransportURL, apiKey: args.APIKey, @@ -336,24 +381,15 @@ func New(args ServerArgs) (*Server, error) { tracer: tracer, version: args.Version, port: args.Port, - apiclient: util.NewAPIClient(context.Background(), args.Logger, args.APIURL, args.APIKey), + apiclient: util.NewAPIClient(context.Background(), pendingLogger, args.APIURL, args.APIKey), pendingLogger: pendingLogger, pending: make(map[string]*AgentRequest), - connected: make(chan struct{}, 1), + connected: make(chan string, 1), + serverAddr: args.ServerAddr, } - server.bridge = bridge.New(bridge.Options{ - Context: args.Ctx, - Logger: pendingLogger, - APIKey: args.ProjectToken, - URL: args.TransportURL, - ConnectionInfo: args.Connection, - Handler: server, - }) - - if err := server.bridge.Connect(); err != nil { - return nil, err - } + go server.connect(true) + go server.monitor() return server, nil } diff --git a/internal/dev/tui.go b/internal/dev/tui.go index e29e670b..4a529579 100644 --- a/internal/dev/tui.go +++ b/internal/dev/tui.go @@ -46,7 +46,7 @@ type model struct { paused bool showhelp bool showagents bool - agents []Agent + agents []*Agent selectedLog *logItem spinner spinner.Model spinning bool @@ -163,6 +163,11 @@ func (m *model) generateInfoBox() string { AlignHorizontal(lipgloss.Left). Foreground(labelColor) + url := "loading..." + if m.publicUrl != "" { + url = tui.Link("%s", m.publicUrl) + " " + tui.Muted("(only accessible while running)") + } + content := fmt.Sprintf(`%s %s %s @@ -171,7 +176,7 @@ func (m *model) generateInfoBox() string { tui.Bold("⨺ Agentuity DevMode")+" "+statusStyle.Render(tui.PadLeft("⏺", m.windowSize.Width-25, " ")), label("Dashboard"), tui.Link("%s", m.appUrl), label("Local"), tui.Link("%s", m.devModeUrl), - label("Public"), tui.Link("%s", m.publicUrl)+" "+tui.Muted("(only accessible in devmode)"), + label("Public"), url, ) return devmodeBox.Render(content) } @@ -387,7 +392,7 @@ type DevModeConfig struct { DevModeUrl string PublicUrl string AppUrl string - Agents []Agent + Agents []*Agent } func NewDevModeUI(ctx context.Context, config DevModeConfig) *DevModeUI { @@ -399,6 +404,10 @@ func NewDevModeUI(ctx context.Context, config DevModeConfig) *DevModeUI { } } +func (d *DevModeUI) SetPublicURL(url string) { + d.model.publicUrl = url +} + // Done returns a channel that will be closed when the program is done func (d *DevModeUI) Done() <-chan struct{} { return d.ctx.Done() From 18d98de71e4ee36cc0d2a79ca517e2f4f525df6f Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Wed, 14 May 2025 21:14:41 -0500 Subject: [PATCH 4/8] include id --- internal/dev/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/dev/server.go b/internal/dev/server.go index 05fc008e..09fad38e 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -105,7 +105,7 @@ func (s *Server) Close() error { func (s *Server) refreshConnection() error { var resp ConnectionResponse - if err := s.apiclient.Do("GET", "/cli/devmode/"+s.Project.Project.ProjectId, nil, &resp); err != nil { + if err := s.apiclient.Do("GET", "/cli/devmode/"+s.Project.Project.ProjectId+"/"+s.ID, nil, &resp); err != nil { return fmt.Errorf("failed to refresh connection: %w", err) } s.otelUrl = resp.Data.OtelUrl From 9f3580127b4b95aa3b8aa268c0c7b14b16faa149 Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Fri, 16 May 2025 14:50:03 -0500 Subject: [PATCH 5/8] working end-to-end --- go.mod | 5 +- go.sum | 6 +- internal/dev/request.go | 212 ---------------------------------- internal/dev/server.go | 250 ++++++++++++++++++++++++++-------------- 4 files changed, 167 insertions(+), 306 deletions(-) delete mode 100644 internal/dev/request.go diff --git a/go.mod b/go.mod index 5536fe87..479e2d83 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.2 require ( github.com/Masterminds/semver v1.5.0 - github.com/agentuity/go-common v1.0.55 + github.com/agentuity/go-common v1.0.59 github.com/agentuity/mcp-golang/v2 v2.0.2 github.com/bep/debounce v1.2.1 github.com/bmatcuk/doublestar/v4 v4.8.1 @@ -22,7 +22,6 @@ require ( github.com/spf13/cobra v1.9.1 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 - github.com/xtaci/smux v1.5.34 github.com/zijiren233/yaml-comment v0.2.2 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/term v0.30.0 @@ -132,7 +131,7 @@ require ( go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.38.0 golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.32.0 golang.org/x/text v0.23.0 // indirect diff --git a/go.sum b/go.sum index e0ded5f9..af7e7f40 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.1.5 h1:eoAQfK2dwL+tFSFpr7TbOaPNUbPiJj4fLYwwGE1FQO4= github.com/ProtonMail/go-crypto v1.1.5/go.mod h1:rA3QumHc/FZ8pAHreoekgiAbzpNsfQAosU5td4SnOrE= -github.com/agentuity/go-common v1.0.55 h1:ZaNh200F9TW0apb9UIhvfNDk76wAFAw5ufIU9VW3weU= -github.com/agentuity/go-common v1.0.55/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= +github.com/agentuity/go-common v1.0.59 h1:WOR35IDV6X7qSBr+E7ztF1PygENO305/nEKd+rDAX2A= +github.com/agentuity/go-common v1.0.59/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= github.com/agentuity/mcp-golang/v2 v2.0.2 h1:wZqS/aHWZsQoU/nd1E1/iMsVY2dywWT9+PFlf+3YJxo= github.com/agentuity/mcp-golang/v2 v2.0.2/go.mod h1:U105tZXyTatxxOBlcObRgLb/ULvGgT2DJ1nq/8++P6Q= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= @@ -266,8 +266,6 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= -github.com/xtaci/smux v1.5.34 h1:OUA9JaDFHJDT8ZT3ebwLWPAgEfE6sWo2LaTy3anXqwg= -github.com/xtaci/smux v1.5.34/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/zijiren233/yaml-comment v0.2.2 h1:5ghs8huXFVb/kWCi66P+xbXq0GnOE2XVCnhaWd7mTs8= diff --git a/internal/dev/request.go b/internal/dev/request.go deleted file mode 100644 index c8acf2bb..00000000 --- a/internal/dev/request.go +++ /dev/null @@ -1,212 +0,0 @@ -package dev - -import ( - "context" - "fmt" - "io" - "net/http" - "strings" - "time" - - "github.com/agentuity/go-common/bridge" - "github.com/agentuity/go-common/logger" - "github.com/agentuity/go-common/telemetry" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/trace" -) - -var propagator propagation.TraceContext - -type packet struct { - buf []byte -} - -func (p *packet) String() string { - return fmt.Sprintf("packet{buf: %d bytes}", len(p.buf)) -} - -type AgentRequest struct { - ctx context.Context - logger logger.Logger - pending chan *packet - req *http.Request - statusCode int - headers map[string]string - body io.ReadCloser - span trace.Span - started time.Time - completed chan struct{} -} - -var _ io.Reader = (*AgentRequest)(nil) - -type AgentRequestArgs struct { - Context context.Context - Logger logger.Logger - Tracer trace.Tracer - ID string - URL string - Version string - AgentID string - OrgID string - ProjectID string - Headers map[string]string -} - -func NewAgentRequest(args AgentRequestArgs) (*AgentRequest, error) { - started := time.Now() - var err error - - sctx, logger, span := telemetry.StartSpan(args.Context, args.Logger, args.Tracer, "TriggerRun", - trace.WithAttributes( - attribute.Bool("@agentuity/devmode", true), - attribute.String("trigger", "manual"), - attribute.String("@agentuity/deploymentId", args.ID), - ), - trace.WithSpanKind(trace.SpanKindConsumer), - ) - - defer func() { - // only end the span if there was an error - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - span.SetAttributes( - attribute.Int64("@agentuity/cpu_time", time.Since(started).Milliseconds()), - ) - span.End() - } - }() - - span.SetAttributes( - attribute.String("@agentuity/agentId", args.AgentID), - attribute.String("@agentuity/orgId", args.OrgID), - attribute.String("@agentuity/projectId", args.ProjectID), - attribute.String("@agentuity/env", "development"), - ) - - spanContext := span.SpanContext() - traceState := spanContext.TraceState() - traceState, err = traceState.Insert("id", args.AgentID) - if err != nil { - logger.Error("failed to insert agent id into trace state: %s", err) - err = fmt.Errorf("failed to insert agent id into trace state: %w", err) - return nil, err - } - traceState, err = traceState.Insert("oid", args.OrgID) - if err != nil { - logger.Error("failed to insert org id into trace state: %s", err) - err = fmt.Errorf("failed to insert org id into trace state: %w", err) - return nil, err - } - traceState, err = traceState.Insert("pid", args.ProjectID) - if err != nil { - logger.Error("failed to insert project id into trace state: %s", err) - err = fmt.Errorf("failed to insert project id into trace state: %w", err) - return nil, err - } - - ctx := trace.ContextWithSpanContext(sctx, spanContext.WithTraceState(traceState)) - - agentReq := &AgentRequest{ - ctx: ctx, - logger: logger, - pending: make(chan *packet, 10), - completed: make(chan struct{}, 1), - started: started, - span: span, - } - req, err := http.NewRequestWithContext(ctx, "POST", args.URL, agentReq) - if err != nil { - return nil, err - } - - for k, v := range args.Headers { - req.Header.Set(k, v) - } - - req.Header.Set("x-agentuity-trigger", "manual") - req.Header.Set("User-Agent", "Agentuity CLI/"+args.Version) - propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)) - - agentReq.req = req - - return agentReq, nil -} - -func (r *AgentRequest) Run() error { - var err error - var resp *http.Response - - defer func() { - if err != nil { - r.span.RecordError(err) - r.span.SetStatus(codes.Error, err.Error()) - } - }() - - r.logger.Debug("sending request to agent: %s", r.req.URL) - resp, err = http.DefaultClient.Do(r.req) - if err != nil { - return err - } - r.logger.Debug("sent request to agent: %s, returned: %d", r.req.URL, resp.StatusCode) - r.statusCode = resp.StatusCode - r.headers = make(map[string]string) - for k, v := range resp.Header { - r.headers[k] = strings.Join(v, ", ") - } - r.body = resp.Body - r.req = nil - r.completed <- struct{}{} // signal that the request is complete - return nil -} - -func (r *AgentRequest) Read(p []byte) (n int, err error) { - select { - case packet := <-r.pending: - if packet == nil { - r.logger.Debug("incoming buffer is EOF") - return 0, io.EOF - } - if len(packet.buf) > len(p) { - return 0, fmt.Errorf("incoming buffer is larger (%d) than the outgoing buffer (%d)", len(packet.buf), len(p)) - } - return copy(p, packet.buf), nil - case <-r.ctx.Done(): - return 0, r.ctx.Err() - } -} - -func (r *AgentRequest) close(client *bridge.Client, id string) { - r.logger.Trace("closing agent request: %s", id) - close(r.pending) - defer func() { - if r.body != nil { - r.body.Close() - } - r.statusCode = 0 - r.headers = nil - r.body = nil - r.req = nil - r.completed = nil - if r.span != nil { - r.span.End() - r.span = nil - } - }() - r.logger.Trace("waiting for agent request to complete: %s", id) - <-r.completed // block for the request to complete - r.logger.Debug("replying to agent request: %s, status: %d, headers: %v", id, r.statusCode, r.headers) - if err := client.Reply(id, r.statusCode, r.headers, r.body); err != nil { - r.logger.Error("failed to reply to agent request: %s", err) - } - r.logger.Info("processed sess_%s in %s", r.span.SpanContext().TraceID(), time.Since(r.started)) -} - -func (r *AgentRequest) send(buf []byte) { - r.logger.Trace("sending buffer: %d bytes", len(buf)) - r.pending <- &packet{buf} -} diff --git a/internal/dev/server.go b/internal/dev/server.go index 09fad38e..b59bc45c 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -1,17 +1,15 @@ package dev import ( - "bufio" - "bytes" "context" "crypto/tls" "encoding/json" "errors" "fmt" - "io" "math" - "net" "net/http" + "net/http/httputil" + "net/url" "strings" "sync" "time" @@ -20,12 +18,17 @@ import ( "github.com/agentuity/cli/internal/util" "github.com/agentuity/go-common/logger" cstr "github.com/agentuity/go-common/string" - "github.com/xtaci/smux" + "github.com/agentuity/go-common/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "golang.org/x/net/http2" ) +var propagator propagation.TraceContext + type Server struct { ID string otelToken string @@ -47,14 +50,13 @@ type Server struct { port int connected chan string pendingLogger logger.Logger - pending map[string]*AgentRequest - pendingMu sync.RWMutex expiresAt *time.Time tlsCertificate *tls.Certificate conn *tls.Conn - session *smux.Session + srv *http2.Server wg sync.WaitGroup serverAddr string + cleanup func() } type ServerArgs struct { @@ -90,15 +92,14 @@ func (s *Server) Close() error { s.logger.Debug("closing connection") s.once.Do(func() { s.cancel() - if s.session != nil { - s.session.Close() - s.session = nil - } + s.wg.Wait() if s.conn != nil { s.conn.Close() s.conn = nil } - s.wg.Wait() + if s.cleanup != nil { + s.cleanup() + } }) return nil } @@ -121,14 +122,20 @@ func (s *Server) refreshConnection() error { return fmt.Errorf("failed to create tls key pair: %w", err) } s.tlsCertificate = &cert + if s.cleanup == nil { + ctx, logger, cleanup, err := telemetry.NewWithAPIKey(s.ctx, "@agentuity/cli", s.otelUrl, s.otelToken, s.logger) + if err != nil { + return fmt.Errorf("failed to create OTLP telemetry trace: %w", err) + } + s.ctx = ctx + s.logger = logger + s.cleanup = cleanup + } + return nil } func (s *Server) reconnect() { - if s.session != nil { - s.session.Close() - s.session = nil - } if s.conn != nil { s.conn.Close() s.conn = nil @@ -137,13 +144,11 @@ func (s *Server) reconnect() { } func (s *Server) connect(initial bool) { - s.wg.Add(1) var gerr error defer func() { if initial && gerr != nil { s.connected <- gerr.Error() } - s.wg.Done() }() if err := s.refreshConnection(); err != nil { @@ -154,6 +159,7 @@ func (s *Server) connect(initial bool) { var tlsConfig tls.Config tlsConfig.Certificates = []tls.Certificate{*s.tlsCertificate} + tlsConfig.NextProtos = []string{"h2"} if strings.Contains(s.serverAddr, "localhost") || strings.Contains(s.serverAddr, "127.0.0.1") { tlsConfig.InsecureSkipVerify = true @@ -167,29 +173,18 @@ func (s *Server) connect(initial bool) { } s.conn = conn - sess, err := smux.Client(conn, nil) - if err != nil { - gerr = err - s.logger.Error("failed to start smux session: %s", err) - return - } - s.session = sess - if initial { s.connected <- "" } - for { - stream, err := sess.AcceptStream() - if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, io.ErrClosedPipe) { - break - } - s.logger.Error("Stream accept failed: %s", err) - break - } - go s.handleStream(s.ctx, s.logger, stream, s.tlsCertificate.Leaf.Subject.CommonName) - } + // HTTP/2 server to accept proxied requests over the tunnel connection + h2s := &http2.Server{} + + h2s.ServeConn(conn, &http2.ServeConnOpts{ + Handler: http.HandlerFunc(s.handleStream), + Context: s.ctx, + }) + } type AgentsControlResponse struct { @@ -198,84 +193,166 @@ type AgentsControlResponse struct { Agents []project.AgentConfig `json:"agents"` } -func (s *Server) handleStream(ctx context.Context, logger logger.Logger, stream net.Conn, hostname string) { +func sendCORSHeaders(headers http.Header) { + headers.Set("access-control-allow-origin", "*") + headers.Set("access-control-expose-headers", "Content-Type") + headers.Set("access-control-allow-headers", "Content-Type, Authorization") + headers.Set("access-control-allow-methods", "GET, POST, OPTIONS") +} + +func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { s.wg.Add(1) - defer func() { - stream.Close() - s.wg.Done() - }() + defer s.wg.Done() - // Read request from stream - req, err := http.ReadRequest(bufio.NewReader(stream)) - if err != nil { - logger.Error("Failed to parse HTTP request: %v", err) + s.logger.Trace("handleStream: %s %s", r.Method, r.URL) + + if r.Method == "OPTIONS" { + sendCORSHeaders(w.Header()) + w.WriteHeader(http.StatusOK) return } - switch req.URL.Path { + switch r.URL.Path { case "/_health": - resp := &http.Response{ - StatusCode: http.StatusOK, - } - select { - case <-s.ctx.Done(): - resp.StatusCode = http.StatusServiceUnavailable - default: - } - resp.Write(stream) + w.WriteHeader(http.StatusOK) return case "/_agents": - resp := &http.Response{ - StatusCode: http.StatusOK, - } + sendCORSHeaders(w.Header()) buf, err := json.Marshal(AgentsControlResponse{ ProjectID: s.Project.Project.ProjectId, ProjectName: s.Project.Project.Name, Agents: s.Project.Project.Agents, }) if err != nil { - logger.Error("Failed to marshal agents control response: %v", err) - resp.StatusCode = http.StatusInternalServerError - resp.Body = io.NopCloser(strings.NewReader(err.Error())) - resp.Write(stream) + s.logger.Error("failed to marshal agents control response: %s", err) + w.WriteHeader(http.StatusInternalServerError) return } - resp.Body = io.NopCloser(bytes.NewReader(buf)) - resp.Write(stream) + w.Header().Set("Content-Type", "application/json") + w.Write(buf) + return + case "/_control": + sendCORSHeaders(w.Header()) + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + rc := http.NewResponseController(w) + rc.Flush() + w.Write([]byte("event: start\ndata: connected\n\n")) + rc.Flush() + select { + case <-s.ctx.Done(): + case <-r.Context().Done(): + } + w.Write([]byte("event: stop\ndata: disconnected\n\n")) + rc.Flush() return } - req = req.WithContext(ctx) + if r.Method != "POST" { + sendCORSHeaders(w.Header()) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } - // Forward to local server - req.RequestURI = "" - req.URL.Scheme = "http" - req.URL.Host = fmt.Sprintf("127.0.0.1:%d", s.port) - req.Header.Set("Host", hostname) + agentId := r.URL.Path[1:] + var found bool + for _, agent := range s.Project.Project.Agents { + if agent.ID == agentId || strings.TrimLeft(agent.ID, "agent_") == agentId { + found = true + agentId = agent.ID + break + } + } + + if !found { + s.logger.Error("agent not found with id: %s", agentId) + sendCORSHeaders(w.Header()) + w.WriteHeader(http.StatusNotFound) + return + } - s.logger.Debug("forwarding request to local server: %s", req.URL.String()) + sctx, logger, span := telemetry.StartSpan(r.Context(), s.logger, s.tracer, "TriggerRun", + trace.WithAttributes( + attribute.Bool("@agentuity/devmode", true), + attribute.String("trigger", "manual"), + attribute.String("@agentuity/deploymentId", s.ID), + ), + trace.WithSpanKind(trace.SpanKindConsumer), + ) - resp, err := http.DefaultClient.Do(req) - if err != nil { - logger.Error("Failed to contact local target: %v", err) - resp = &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: io.NopCloser(strings.NewReader("Local target error")), + var err error + started := time.Now() + + defer func() { + // only end the span if there was an error + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") } - resp.Write(stream) + span.SetAttributes( + attribute.Int64("@agentuity/cpu_time", time.Since(started).Milliseconds()), + ) + span.End() + s.logger.Info("processed sess_%s in %s", span.SpanContext().TraceID(), time.Since(started)) + }() + + span.SetAttributes( + attribute.String("@agentuity/agentId", agentId), + attribute.String("@agentuity/orgId", s.orgId), + attribute.String("@agentuity/projectId", s.Project.Project.ProjectId), + attribute.String("@agentuity/env", "development"), + ) + + spanContext := span.SpanContext() + traceState := spanContext.TraceState() + traceState, err = traceState.Insert("id", agentId) + if err != nil { + logger.Error("failed to insert agent id into trace state: %s", err) + err = fmt.Errorf("failed to insert agent id into trace state: %w", err) + return + } + traceState, err = traceState.Insert("oid", s.orgId) + if err != nil { + logger.Error("failed to insert org id into trace state: %s", err) + err = fmt.Errorf("failed to insert org id into trace state: %w", err) + return + } + traceState, err = traceState.Insert("pid", s.Project.Project.ProjectId) + if err != nil { + logger.Error("failed to insert project id into trace state: %s", err) + err = fmt.Errorf("failed to insert project id into trace state: %w", err) return } - defer resp.Body.Close() - s.logger.Debug("received response from local server: %s, status code: %d", req.URL.String(), resp.StatusCode) + newctx := trace.ContextWithSpanContext(sctx, spanContext.WithTraceState(traceState)) - // TODO: fix streaming + nr := r.WithContext(newctx) + nr.Header = r.Header.Clone() + nr.Header.Set("x-agentuity-trigger", "manual") + nr.Header.Set("User-Agent", "Agentuity CLI/"+s.version) + propagator.Inject(newctx, propagation.HeaderCarrier(nr.Header)) - // Send response back - err = resp.Write(stream) + logger.Info("sending headers: %+v", nr.Header) + + url, err := url.Parse(r.URL.String()) if err != nil { - logger.Error("Failed to write response to stream: %v", err) + logger.Error("failed to parse url: %s", err) + w.WriteHeader(http.StatusInternalServerError) + return } + url.Scheme = "http" + url.Host = fmt.Sprintf("127.0.0.1:%d", s.port) + url.Path = "" // proxy sets so this acts like the base + + logger.Info("sending to: %s", url) + + proxy := httputil.NewSingleHostReverseProxy(url) + proxy.FlushInterval = -1 // no buffering so we can stream + proxy.ServeHTTP(w, nr) } func (s *Server) WebURL(appUrl string) string { @@ -383,7 +460,6 @@ func New(args ServerArgs) (*Server, error) { port: args.Port, apiclient: util.NewAPIClient(context.Background(), pendingLogger, args.APIURL, args.APIKey), pendingLogger: pendingLogger, - pending: make(map[string]*AgentRequest), connected: make(chan string, 1), serverAddr: args.ServerAddr, } From 3b89a00d0ed1d2711dc9dfa2aef48ae788e1cc5e Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Fri, 16 May 2025 14:56:37 -0500 Subject: [PATCH 6/8] make trace --- internal/dev/server.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/dev/server.go b/internal/dev/server.go index b59bc45c..75ec68ce 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -336,8 +336,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { nr.Header.Set("User-Agent", "Agentuity CLI/"+s.version) propagator.Inject(newctx, propagation.HeaderCarrier(nr.Header)) - logger.Info("sending headers: %+v", nr.Header) - url, err := url.Parse(r.URL.String()) if err != nil { logger.Error("failed to parse url: %s", err) @@ -348,7 +346,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { url.Host = fmt.Sprintf("127.0.0.1:%d", s.port) url.Path = "" // proxy sets so this acts like the base - logger.Info("sending to: %s", url) + logger.Trace("sending to: %s", url) proxy := httputil.NewSingleHostReverseProxy(url) proxy.FlushInterval = -1 // no buffering so we can stream From 7a0a6a53b22af3e10c713e3e67c4c699661ae134 Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Fri, 16 May 2025 16:41:29 -0500 Subject: [PATCH 7/8] handle home page --- internal/dev/server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/dev/server.go b/internal/dev/server.go index 75ec68ce..e4010369 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -17,6 +17,7 @@ import ( "github.com/agentuity/cli/internal/project" "github.com/agentuity/cli/internal/util" "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/message" cstr "github.com/agentuity/go-common/string" "github.com/agentuity/go-common/telemetry" "go.opentelemetry.io/otel" @@ -213,6 +214,9 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { } switch r.URL.Path { + case "/": + message.CustomErrorResponse(w, "Agents, Not Humans, Live Here", "Hi! I'm an Agentuity Agent running in development mode.", "", http.StatusOK) + return case "/_health": w.WriteHeader(http.StatusOK) return From 57c4d61c47afbbb2d7dcdfb4b297c5d0597ada14 Mon Sep 17 00:00:00 2001 From: Jeff Haynie Date: Fri, 16 May 2025 17:20:21 -0500 Subject: [PATCH 8/8] default to 443 --- cmd/dev.go | 2 +- internal/dev/server.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/dev.go b/cmd/dev.go index 02777d94..bb701b00 100644 --- a/cmd/dev.go +++ b/cmd/dev.go @@ -272,6 +272,6 @@ func init() { rootCmd.AddCommand(devCmd) devCmd.Flags().StringP("dir", "d", ".", "The directory to run the development server in") devCmd.Flags().Int("port", 0, "The port to run the development server on (uses project default if not provided)") - devCmd.Flags().String("server", "echo.agentuity.cloud:12001", "the echo server to connect to") + devCmd.Flags().String("server", "echo.agentuity.cloud", "the echo server to connect to") devCmd.Flags().MarkHidden("server") } diff --git a/internal/dev/server.go b/internal/dev/server.go index e4010369..ff8c8273 100644 --- a/internal/dev/server.go +++ b/internal/dev/server.go @@ -166,6 +166,10 @@ func (s *Server) connect(initial bool) { tlsConfig.InsecureSkipVerify = true } + if !strings.Contains(s.serverAddr, ":") { + s.serverAddr = fmt.Sprintf("%s:443", s.serverAddr) + } + conn, err := tls.Dial("tcp", s.serverAddr, &tlsConfig) if err != nil { gerr = err