diff --git a/cmd/bundle.go b/cmd/bundle.go index 59029550..f6c7da93 100644 --- a/cmd/bundle.go +++ b/cmd/bundle.go @@ -50,6 +50,7 @@ Examples: Production: production, Install: install, CI: ci, + Writer: os.Stderr, }); err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to bundle project")).ShowErrorAndExit() } diff --git a/cmd/dev.go b/cmd/dev.go index 8f586029..bb701b00 100644 --- a/cmd/dev.go +++ b/cmd/dev.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "io" "os" "os/signal" "runtime" @@ -15,11 +16,9 @@ import ( "github.com/agentuity/cli/internal/project" "github.com/agentuity/cli/internal/util" "github.com/agentuity/go-common/env" - cstr "github.com/agentuity/go-common/string" "github.com/agentuity/go-common/tui" "github.com/bep/debounce" "github.com/spf13/cobra" - "github.com/spf13/viper" ) var devCmd = &cobra.Command{ @@ -41,9 +40,8 @@ Examples: agentuity dev --dir /path/to/project`, Run: func(cmd *cobra.Command, args []string) { log := env.NewLogger(cmd) - _, appUrl, _ := util.GetURLs(log) - websocketUrl := viper.GetString("overrides.websocket_url") - websocketId, _ := cmd.Flags().GetString("websocket-id") + logLevel := env.LogLevel(cmd) + apiUrl, appUrl, transportUrl := util.GetURLs(log) signals := []os.Signal{os.Interrupt, syscall.SIGINT} if runtime.GOOS != "windows" { @@ -75,10 +73,6 @@ Examples: orgId := project.OrgId - if websocketId == "" { - websocketId = cstr.NewHash(orgId, userId) - } - port, _ := cmd.Flags().GetInt("port") if port == 0 { port, err = dev.FindAvailablePort(theproject) @@ -87,76 +81,127 @@ Examples: } } - websocketConn, err := dev.NewWebsocket(dev.WebsocketArgs{ + serverAddr, _ := cmd.Flags().GetString("server") + + server, err := dev.New(dev.ServerArgs{ Ctx: ctx, Logger: log, - WebsocketId: websocketId, - WebsocketUrl: websocketUrl, + LogLevel: logLevel, + APIURL: apiUrl, + TransportURL: transportUrl, APIKey: apiKey, + OrgId: orgId, Project: theproject, Version: Version, - OrgId: orgId, + UserId: userId, + Port: port, + ServerAddr: serverAddr, }) if err != nil { log.Fatal("failed to create live dev connection: %s", err) } - defer websocketConn.Close() + defer server.Close() processCtx := context.Background() var pid int - projectServerCmd, err := dev.CreateRunProjectCmd(processCtx, log, theproject, websocketConn, dir, orgId, port) + consoleUrl := server.WebURL(appUrl) + devModeUrl := fmt.Sprintf("http://127.0.0.1:%d", port) + + agents := make([]*dev.Agent, 0) + for _, agent := range theproject.Project.Agents { + agents = append(agents, &dev.Agent{ + ID: agent.ID, + Name: agent.Name, + LocalURL: fmt.Sprintf("%s/%s", devModeUrl, agent.ID), + }) + } + + ui := dev.NewDevModeUI(ctx, dev.DevModeConfig{ + DevModeUrl: devModeUrl, + AppUrl: consoleUrl, + Agents: agents, + }) + + ui.Start() + + defer ui.Close(false) + + tuiLogger := dev.NewTUILogger(logLevel, ui) + + if err := server.Connect(ui, tuiLogger); err != nil { + log.Error("failed to start live dev connection: %s", err) + ui.Close(true) + return + } + + publicUrl := server.PublicURL(appUrl) + ui.SetPublicURL(publicUrl) + + for _, agent := range agents { + agent.PublicURL = fmt.Sprintf("%s/%s", publicUrl, agent.ID) + } + + projectServerCmd, err := dev.CreateRunProjectCmd(processCtx, tuiLogger, theproject, server, dir, orgId, port, tuiLogger) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to run project")).ShowErrorAndExit() } - build := func(initial bool) { + build := func(initial bool) bool { started := time.Now() var ok bool - tui.ShowSpinner("Building project ...", func() { + ui.ShowSpinner("Building project ...", func() { + var w io.Writer = tuiLogger if err := bundler.Bundle(bundler.BundleContext{ Context: ctx, - Logger: log, + Logger: tuiLogger, ProjectDir: dir, Production: false, - DevMode: !initial, + DevMode: true, + Writer: w, }); err != nil { if err == bundler.ErrBuildFailed { - log.Error("build failed ...") return } errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to bundle project: %s", err))).ShowErrorAndExit() } ok = true }) - if ok { - fmt.Println(tui.Text(fmt.Sprintf("✨ Built in %s", time.Since(started).Round(time.Millisecond)))) + if ok && !initial { + ui.SetStatusMessage("✨ Built in %s", time.Since(started).Round(time.Millisecond)) } + return ok } - // Initial build - build(true) + // Initial build must exit if it fails + if !build(true) { + ui.Close(true) + return + } restart := func() { isDeliberateRestart = true build(false) - log.Debug("killing project server") - dev.KillProjectServer(log, projectServerCmd, pid) - log.Debug("killing project server done") + tuiLogger.Debug("killing project server") + dev.KillProjectServer(tuiLogger, projectServerCmd, pid) + tuiLogger.Debug("killing project server done") } + ui.SetStatusMessage("starting ...") + ui.SetSpinner(true) + // debounce a lot of changes at once to avoid multiple restarts in succession debounced := debounce.New(250 * time.Millisecond) // Watch for changes - watcher, err := dev.NewWatcher(log, dir, theproject.Project.Development.Watch.Files, func(path string) { - log.Trace("%s has changed", path) + watcher, err := dev.NewWatcher(tuiLogger, dir, theproject.Project.Development.Watch.Files, func(path string) { + tuiLogger.Trace("%s has changed", path) debounced(restart) }) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to start watcher: %s", err))).ShowErrorAndExit() } - defer watcher.Close(log) + defer watcher.Close(tuiLogger) if err := projectServerCmd.Start(); err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to start project: %s", err))).ShowErrorAndExit() @@ -164,26 +209,29 @@ Examples: pid = projectServerCmd.Process.Pid - websocketConn.StartReadingMessages(ctx, log, port) - devUrl := websocketConn.WebURL(appUrl) + if err := server.HealthCheck(devModeUrl); err != nil { + tuiLogger.Error("failed to health check connection: %s", err) + ui.Close(true) + return + } - // Display local interaction instructions - displayLocalInstructions(port, theproject.Project.Agents, devUrl) + ui.SetStatusMessage("🚀 DevMode ready") + ui.SetSpinner(false) go func() { for { - log.Trace("waiting for project server to exit (pid: %d)", pid) + tuiLogger.Trace("waiting for project server to exit (pid: %d)", pid) if err := projectServerCmd.Wait(); err != nil { if !isDeliberateRestart { - log.Error("project server (pid: %d) exited with error: %s", pid, err) + tuiLogger.Error("project server (pid: %d) exited with error: %s", pid, err) } } if projectServerCmd.ProcessState != nil { - log.Debug("project server (pid: %d) exited with code %d", pid, projectServerCmd.ProcessState.ExitCode()) + tuiLogger.Debug("project server (pid: %d) exited with code %d", pid, projectServerCmd.ProcessState.ExitCode()) } else { - log.Debug("project server (pid: %d) exited", pid) + tuiLogger.Debug("project server (pid: %d) exited", pid) } - log.Debug("isDeliberateRestart: %t, pid: %d", isDeliberateRestart, pid) + tuiLogger.Debug("isDeliberateRestart: %t, pid: %d", isDeliberateRestart, pid) if !isDeliberateRestart { return } @@ -191,8 +239,8 @@ Examples: // If it was a deliberate restart, start the new process here if isDeliberateRestart { isDeliberateRestart = false - log.Trace("restarting project server") - projectServerCmd, err = dev.CreateRunProjectCmd(processCtx, log, theproject, websocketConn, dir, orgId, port) + tuiLogger.Trace("restarting project server") + projectServerCmd, err = dev.CreateRunProjectCmd(processCtx, tuiLogger, theproject, server, dir, orgId, port, tuiLogger) if err != nil { errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage("Failed to run project")).ShowErrorAndExit() } @@ -200,71 +248,30 @@ Examples: errsystem.New(errsystem.ErrInvalidConfiguration, err, errsystem.WithContextMessage(fmt.Sprintf("Failed to start project: %s", err))).ShowErrorAndExit() } pid = projectServerCmd.Process.Pid - log.Trace("restarted project server (pid: %d)", pid) + tuiLogger.Trace("restarted project server (pid: %d)", pid) } } }() teardown := func() { - watcher.Close(log) - websocketConn.Close() - dev.KillProjectServer(log, projectServerCmd, pid) + watcher.Close(tuiLogger) + server.Close() + dev.KillProjectServer(tuiLogger, projectServerCmd, pid) } select { - case <-websocketConn.Done(): - log.Info("live dev connection closed, shutting down") + case <-ui.Done(): teardown() case <-ctx.Done(): - log.Info("context done, shutting down") teardown() } }, } -func displayLocalInstructions(port int, agents []project.AgentConfig, devModeUrl string) { - title := tui.Title("🚀 Local Agent Interaction") - - // Combine all elements with appropriate spacing - fmt.Println() - fmt.Println(title) - - // Create list of available agents - if len(agents) > 0 { - fmt.Println() - - for _, agent := range agents { - // Display agent name and ID - fmt.Println(tui.Text(" • ") + tui.PadRight(agent.Name, 20, " ") + " " + tui.Muted(agent.ID)) - } - } - - // Get a sample agent ID if available - sampleAgentID := "agent_ID" - if len(agents) > 0 { - sampleAgentID = agents[0].ID - } - - curlCommand := fmt.Sprintf("curl -v http://127.0.0.1:%d/%s --json '{\"input\": \"Hello, world!\"}'", port, sampleAgentID) - - fmt.Println() - fmt.Println(tui.Text("To interact with your agents locally, you can use:")) - fmt.Println() - fmt.Println(tui.Highlight(curlCommand)) - fmt.Println() - - fmt.Print(tui.Text("Or use the 💻 Dev Mode in our app: ")) - fmt.Println(tui.Link("%s", devModeUrl)) - - fmt.Println() -} - func init() { rootCmd.AddCommand(devCmd) devCmd.Flags().StringP("dir", "d", ".", "The directory to run the development server in") - devCmd.Flags().String("websocket-id", "", "The websocket room id to use for the development agent") - devCmd.Flags().String("org-id", "", "The organization to run the project") devCmd.Flags().Int("port", 0, "The port to run the development server on (uses project default if not provided)") - devCmd.Flags().MarkHidden("websocket-id") - devCmd.Flags().MarkHidden("org-id") + devCmd.Flags().String("server", "echo.agentuity.cloud", "the echo server to connect to") + devCmd.Flags().MarkHidden("server") } diff --git a/go.mod b/go.mod index 1176d9d6..479e2d83 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.2 require ( github.com/Masterminds/semver v1.5.0 - github.com/agentuity/go-common v1.0.47 + github.com/agentuity/go-common v1.0.59 github.com/agentuity/mcp-golang/v2 v2.0.2 github.com/bep/debounce v1.2.1 github.com/bmatcuk/doublestar/v4 v4.8.1 @@ -16,7 +16,6 @@ require ( github.com/evanw/esbuild v0.25.0 github.com/fsnotify/fsnotify v1.7.0 github.com/google/uuid v1.6.0 - github.com/gorilla/websocket v1.5.3 github.com/marcozac/go-jsonc v0.1.1 github.com/mattn/go-isatty v0.0.20 github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c @@ -25,6 +24,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/zijiren233/yaml-comment v0.2.2 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/term v0.30.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.32.1 ) @@ -48,6 +48,7 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/pjbgf/sha1cd v0.3.2 // indirect + github.com/sahilm/fuzzy v0.1.1 // indirect github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect github.com/skeema/knownhosts v1.3.1 // indirect github.com/tidwall/gjson v1.18.0 // indirect @@ -58,7 +59,6 @@ require ( github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect golang.org/x/crypto v0.36.0 // indirect - golang.org/x/term v0.30.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect ) @@ -67,7 +67,7 @@ require ( github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/buger/goterm v1.0.4 // indirect github.com/catppuccin/go v0.2.0 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/x/ansi v0.8.0 // indirect github.com/charmbracelet/x/exp/strings v0.0.0-20240722160745-212f7b056ed0 // indirect @@ -131,7 +131,7 @@ require ( go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/net v0.38.0 // indirect + golang.org/x/net v0.38.0 golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.32.0 golang.org/x/text v0.23.0 // indirect diff --git a/go.sum b/go.sum index 3b050756..af7e7f40 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.1.5 h1:eoAQfK2dwL+tFSFpr7TbOaPNUbPiJj4fLYwwGE1FQO4= github.com/ProtonMail/go-crypto v1.1.5/go.mod h1:rA3QumHc/FZ8pAHreoekgiAbzpNsfQAosU5td4SnOrE= -github.com/agentuity/go-common v1.0.47 h1:xpR3JO+NseSLPbr/8h3OwjbdMAVeoaPKmYUbR1QLAgQ= -github.com/agentuity/go-common v1.0.47/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= +github.com/agentuity/go-common v1.0.59 h1:WOR35IDV6X7qSBr+E7ztF1PygENO305/nEKd+rDAX2A= +github.com/agentuity/go-common v1.0.59/go.mod h1:cy1EPYpZUkp3JSMgTb+Sa3sLnS7vQQupj/RwO4An6L4= github.com/agentuity/mcp-golang/v2 v2.0.2 h1:wZqS/aHWZsQoU/nd1E1/iMsVY2dywWT9+PFlf+3YJxo= github.com/agentuity/mcp-golang/v2 v2.0.2/go.mod h1:U105tZXyTatxxOBlcObRgLb/ULvGgT2DJ1nq/8++P6Q= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= @@ -127,8 +127,6 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= -github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -152,6 +150,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -210,6 +210,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/sahilm/fuzzy v0.1.1 h1:ceu5RHF8DGgoi+/dR5PsECjCDH1BE3Fnmpo7aVXOdRA= +github.com/sahilm/fuzzy v0.1.1/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 h1:D0vL7YNisV2yqE55+q0lFuGse6U8lxlg7fYTctlT5Gc= github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8= diff --git a/internal/bundler/bundler.go b/internal/bundler/bundler.go index 045b557b..5a0e810b 100644 --- a/internal/bundler/bundler.go +++ b/internal/bundler/bundler.go @@ -3,6 +3,7 @@ package bundler import ( "context" "fmt" + "io" "os" "os/exec" "path/filepath" @@ -38,6 +39,7 @@ type BundleContext struct { Install bool CI bool DevMode bool + Writer io.Writer } func bundleJavascript(ctx BundleContext, dir string, outdir string, theproject *project.Project) error { @@ -156,11 +158,11 @@ func bundleJavascript(ctx BundleContext, dir string, outdir string, theproject * }) ctx.Logger.Debug("finished build in %v", time.Since(started)) if len(result.Errors) > 0 { - fmt.Println("\n" + tui.Warning("Build Failed") + "\n") + fmt.Fprintln(ctx.Writer, "\n"+tui.Warning("Build Failed")+"\n") for _, err := range result.Errors { formattedError := formatESBuildError(dir, err) - fmt.Println(formattedError) + fmt.Fprintln(ctx.Writer, formattedError) } if ctx.DevMode { diff --git a/internal/dev/dev.go b/internal/dev/dev.go index 4bdc3879..ffcb765a 100644 --- a/internal/dev/dev.go +++ b/internal/dev/dev.go @@ -3,6 +3,7 @@ package dev import ( "context" "fmt" + "io" "net" "os" "os/exec" @@ -97,16 +98,16 @@ func FindAvailablePort(p project.ProjectContext) (int, error) { return findAvailablePort() } -func CreateRunProjectCmd(ctx context.Context, log logger.Logger, theproject project.ProjectContext, liveDevConnection *Websocket, dir string, orgId string, port int) (*exec.Cmd, error) { +func CreateRunProjectCmd(ctx context.Context, log logger.Logger, theproject project.ProjectContext, server *Server, dir string, orgId string, port int, writer io.Writer) (*exec.Cmd, error) { // set the vars projectServerCmd := exec.CommandContext(ctx, theproject.Project.Development.Command, theproject.Project.Development.Args...) projectServerCmd.Env = os.Environ()[:] - projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_BEARER_TOKEN=%s", liveDevConnection.OtelToken)) - projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_URL=%s", liveDevConnection.OtelUrl)) + projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_BEARER_TOKEN=%s", server.otelToken)) + projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_OTLP_URL=%s", server.otelUrl)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_URL=%s", theproject.APIURL)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_TRANSPORT_URL=%s", theproject.TransportURL)) - projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_DEPLOYMENT_ID=%s", liveDevConnection.webSocketId)) + projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_DEPLOYMENT_ID=%s", server.ID)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_PROJECT_ID=%s", theproject.Project.ProjectId)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_ORG_ID=%s", orgId)) @@ -121,9 +122,8 @@ func CreateRunProjectCmd(ctx context.Context, log logger.Logger, theproject proj projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("AGENTUITY_CLOUD_PORT=%d", port)) projectServerCmd.Env = append(projectServerCmd.Env, fmt.Sprintf("PORT=%d", port)) - projectServerCmd.Stdout = os.Stdout - projectServerCmd.Stderr = os.Stderr - projectServerCmd.Stdin = os.Stdin + projectServerCmd.Stdout = writer + projectServerCmd.Stderr = writer projectServerCmd.Dir = dir util.ProcessSetup(projectServerCmd) diff --git a/internal/dev/pending_logger.go b/internal/dev/pending_logger.go new file mode 100644 index 00000000..2453eacc --- /dev/null +++ b/internal/dev/pending_logger.go @@ -0,0 +1,145 @@ +package dev + +import ( + "context" + "fmt" + "os" + "sync" + + "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/tui" +) + +type PendingLogger struct { + pending []string + logLevel logger.LogLevel + logger logger.Logger + mutex sync.RWMutex +} + +var _ logger.Logger = (*PendingLogger)(nil) + +func NewPendingLogger(logLevel logger.LogLevel) *PendingLogger { + return &PendingLogger{ + pending: make([]string, 0), + logLevel: logLevel, + } +} + +func (l *PendingLogger) drain(ui *DevModeUI, logger logger.Logger) { + l.mutex.Lock() + defer l.mutex.Unlock() + for _, val := range l.pending { + ui.AddLog("%s", val) + } + l.logger = logger + l.pending = nil +} + +// With will return a new logger using metadata as the base context +func (l *PendingLogger) With(metadata map[string]interface{}) logger.Logger { + return l +} + +// WithPrefix will return a new logger with a prefix prepended to the message +func (l *PendingLogger) WithPrefix(prefix string) logger.Logger { + return l +} + +// WithContext will return a new logger with the given context +func (l *PendingLogger) WithContext(ctx context.Context) logger.Logger { + return l +} + +// Trace level logging +func (l *PendingLogger) Trace(msg string, args ...interface{}) { + if logger.LevelTrace < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Trace(msg, args...) + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Debug level logging +func (l *PendingLogger) Debug(msg string, args ...interface{}) { + if logger.LevelDebug < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Debug(msg, args...) + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Info level loggi ng +func (l *PendingLogger) Info(msg string, args ...interface{}) { + if logger.LevelInfo < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Info(msg, args...) + return + } + val := tui.Text("[INFO] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Warning level logging +func (l *PendingLogger) Warn(msg string, args ...interface{}) { + if logger.LevelWarn < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Warn(msg, args...) + return + } + val := tui.Title("[WARN] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Error level logging +func (l *PendingLogger) Error(msg string, args ...interface{}) { + if logger.LevelError < l.logLevel { + return + } + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Error(msg, args...) + return + } + val := tui.Bold("[ERROR] " + fmt.Sprintf(msg, args...)) + l.pending = append(l.pending, val) +} + +// Fatal level logging and exit with code 1 +func (l *PendingLogger) Fatal(msg string, args ...interface{}) { + l.mutex.RLock() + defer l.mutex.RUnlock() + if l.logger != nil { + l.logger.Fatal(msg, args...) + return + } + val := tui.Bold("[FATAL] " + fmt.Sprintf(msg, args...)) + fmt.Println(val) + os.Exit(1) +} + +// Stack will return a new logger that logs to the given logger as well as the current logger +func (l *PendingLogger) Stack(next logger.Logger) logger.Logger { + return l +} diff --git a/internal/dev/server.go b/internal/dev/server.go new file mode 100644 index 00000000..ff8c8273 --- /dev/null +++ b/internal/dev/server.go @@ -0,0 +1,477 @@ +package dev + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "math" + "net/http" + "net/http/httputil" + "net/url" + "strings" + "sync" + "time" + + "github.com/agentuity/cli/internal/project" + "github.com/agentuity/cli/internal/util" + "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/message" + cstr "github.com/agentuity/go-common/string" + "github.com/agentuity/go-common/telemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "golang.org/x/net/http2" +) + +var propagator propagation.TraceContext + +type Server struct { + ID string + otelToken string + otelUrl string + Project project.ProjectContext + orgId string + userId string + apiurl string + transportUrl string + apiKey string + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + tracer trace.Tracer + version string + once sync.Once + apiclient *util.APIClient + publicUrl string + port int + connected chan string + pendingLogger logger.Logger + expiresAt *time.Time + tlsCertificate *tls.Certificate + conn *tls.Conn + srv *http2.Server + wg sync.WaitGroup + serverAddr string + cleanup func() +} + +type ServerArgs struct { + Ctx context.Context + Logger logger.Logger + LogLevel logger.LogLevel + APIURL string + TransportURL string + APIKey string + Project project.ProjectContext + OrgId string + UserId string + Version string + Port int + ServerAddr string +} + +type ConnectionResponse struct { + Success bool `json:"success"` + Error string `json:"message"` + Data struct { + Certificate string `json:"certificate"` + PrivateKey string `json:"private_key"` + Domain string `json:"domain"` + ExpiresAt string `json:"expires_at"` + OtelUrl string `json:"otlp_url"` + OtelBearerToken string `json:"otlp_token"` + } `json:"data"` +} + +// Close closes the bridge client and cleans up the connection +func (s *Server) Close() error { + s.logger.Debug("closing connection") + s.once.Do(func() { + s.cancel() + s.wg.Wait() + if s.conn != nil { + s.conn.Close() + s.conn = nil + } + if s.cleanup != nil { + s.cleanup() + } + }) + return nil +} + +func (s *Server) refreshConnection() error { + var resp ConnectionResponse + if err := s.apiclient.Do("GET", "/cli/devmode/"+s.Project.Project.ProjectId+"/"+s.ID, nil, &resp); err != nil { + return fmt.Errorf("failed to refresh connection: %w", err) + } + s.otelUrl = resp.Data.OtelUrl + s.otelToken = resp.Data.OtelBearerToken + tv, err := time.Parse(time.RFC3339, resp.Data.ExpiresAt) + if err != nil { + return fmt.Errorf("failed to parse expires at: %w", err) + } + s.expiresAt = &tv + s.publicUrl = fmt.Sprintf("https://%s", resp.Data.Domain) + cert, err := tls.X509KeyPair([]byte(resp.Data.Certificate), []byte(resp.Data.PrivateKey)) + if err != nil { + return fmt.Errorf("failed to create tls key pair: %w", err) + } + s.tlsCertificate = &cert + if s.cleanup == nil { + ctx, logger, cleanup, err := telemetry.NewWithAPIKey(s.ctx, "@agentuity/cli", s.otelUrl, s.otelToken, s.logger) + if err != nil { + return fmt.Errorf("failed to create OTLP telemetry trace: %w", err) + } + s.ctx = ctx + s.logger = logger + s.cleanup = cleanup + } + + return nil +} + +func (s *Server) reconnect() { + if s.conn != nil { + s.conn.Close() + s.conn = nil + } + go s.connect(false) +} + +func (s *Server) connect(initial bool) { + var gerr error + defer func() { + if initial && gerr != nil { + s.connected <- gerr.Error() + } + }() + + if err := s.refreshConnection(); err != nil { + s.logger.Error("failed to refresh connection: %s", err) + gerr = err + return + } + + var tlsConfig tls.Config + tlsConfig.Certificates = []tls.Certificate{*s.tlsCertificate} + tlsConfig.NextProtos = []string{"h2"} + + if strings.Contains(s.serverAddr, "localhost") || strings.Contains(s.serverAddr, "127.0.0.1") { + tlsConfig.InsecureSkipVerify = true + } + + if !strings.Contains(s.serverAddr, ":") { + s.serverAddr = fmt.Sprintf("%s:443", s.serverAddr) + } + + conn, err := tls.Dial("tcp", s.serverAddr, &tlsConfig) + if err != nil { + gerr = err + s.logger.Error("failed to dial tls: %s", err) + return + } + s.conn = conn + + if initial { + s.connected <- "" + } + + // HTTP/2 server to accept proxied requests over the tunnel connection + h2s := &http2.Server{} + + h2s.ServeConn(conn, &http2.ServeConnOpts{ + Handler: http.HandlerFunc(s.handleStream), + Context: s.ctx, + }) + +} + +type AgentsControlResponse struct { + ProjectID string `json:"projectId"` + ProjectName string `json:"projectName"` + Agents []project.AgentConfig `json:"agents"` +} + +func sendCORSHeaders(headers http.Header) { + headers.Set("access-control-allow-origin", "*") + headers.Set("access-control-expose-headers", "Content-Type") + headers.Set("access-control-allow-headers", "Content-Type, Authorization") + headers.Set("access-control-allow-methods", "GET, POST, OPTIONS") +} + +func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { + s.wg.Add(1) + defer s.wg.Done() + + s.logger.Trace("handleStream: %s %s", r.Method, r.URL) + + if r.Method == "OPTIONS" { + sendCORSHeaders(w.Header()) + w.WriteHeader(http.StatusOK) + return + } + + switch r.URL.Path { + case "/": + message.CustomErrorResponse(w, "Agents, Not Humans, Live Here", "Hi! I'm an Agentuity Agent running in development mode.", "", http.StatusOK) + return + case "/_health": + w.WriteHeader(http.StatusOK) + return + case "/_agents": + sendCORSHeaders(w.Header()) + buf, err := json.Marshal(AgentsControlResponse{ + ProjectID: s.Project.Project.ProjectId, + ProjectName: s.Project.Project.Name, + Agents: s.Project.Project.Agents, + }) + if err != nil { + s.logger.Error("failed to marshal agents control response: %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(buf) + return + case "/_control": + sendCORSHeaders(w.Header()) + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.WriteHeader(http.StatusOK) + rc := http.NewResponseController(w) + rc.Flush() + w.Write([]byte("event: start\ndata: connected\n\n")) + rc.Flush() + select { + case <-s.ctx.Done(): + case <-r.Context().Done(): + } + w.Write([]byte("event: stop\ndata: disconnected\n\n")) + rc.Flush() + return + } + + if r.Method != "POST" { + sendCORSHeaders(w.Header()) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + agentId := r.URL.Path[1:] + var found bool + for _, agent := range s.Project.Project.Agents { + if agent.ID == agentId || strings.TrimLeft(agent.ID, "agent_") == agentId { + found = true + agentId = agent.ID + break + } + } + + if !found { + s.logger.Error("agent not found with id: %s", agentId) + sendCORSHeaders(w.Header()) + w.WriteHeader(http.StatusNotFound) + return + } + + sctx, logger, span := telemetry.StartSpan(r.Context(), s.logger, s.tracer, "TriggerRun", + trace.WithAttributes( + attribute.Bool("@agentuity/devmode", true), + attribute.String("trigger", "manual"), + attribute.String("@agentuity/deploymentId", s.ID), + ), + trace.WithSpanKind(trace.SpanKindConsumer), + ) + + var err error + started := time.Now() + + defer func() { + // only end the span if there was an error + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + span.SetAttributes( + attribute.Int64("@agentuity/cpu_time", time.Since(started).Milliseconds()), + ) + span.End() + s.logger.Info("processed sess_%s in %s", span.SpanContext().TraceID(), time.Since(started)) + }() + + span.SetAttributes( + attribute.String("@agentuity/agentId", agentId), + attribute.String("@agentuity/orgId", s.orgId), + attribute.String("@agentuity/projectId", s.Project.Project.ProjectId), + attribute.String("@agentuity/env", "development"), + ) + + spanContext := span.SpanContext() + traceState := spanContext.TraceState() + traceState, err = traceState.Insert("id", agentId) + if err != nil { + logger.Error("failed to insert agent id into trace state: %s", err) + err = fmt.Errorf("failed to insert agent id into trace state: %w", err) + return + } + traceState, err = traceState.Insert("oid", s.orgId) + if err != nil { + logger.Error("failed to insert org id into trace state: %s", err) + err = fmt.Errorf("failed to insert org id into trace state: %w", err) + return + } + traceState, err = traceState.Insert("pid", s.Project.Project.ProjectId) + if err != nil { + logger.Error("failed to insert project id into trace state: %s", err) + err = fmt.Errorf("failed to insert project id into trace state: %w", err) + return + } + + newctx := trace.ContextWithSpanContext(sctx, spanContext.WithTraceState(traceState)) + + nr := r.WithContext(newctx) + nr.Header = r.Header.Clone() + nr.Header.Set("x-agentuity-trigger", "manual") + nr.Header.Set("User-Agent", "Agentuity CLI/"+s.version) + propagator.Inject(newctx, propagation.HeaderCarrier(nr.Header)) + + url, err := url.Parse(r.URL.String()) + if err != nil { + logger.Error("failed to parse url: %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + url.Scheme = "http" + url.Host = fmt.Sprintf("127.0.0.1:%d", s.port) + url.Path = "" // proxy sets so this acts like the base + + logger.Trace("sending to: %s", url) + + proxy := httputil.NewSingleHostReverseProxy(url) + proxy.FlushInterval = -1 // no buffering so we can stream + proxy.ServeHTTP(w, nr) +} + +func (s *Server) WebURL(appUrl string) string { + return fmt.Sprintf("%s/devmode/%s", appUrl, s.ID) +} + +func (s *Server) PublicURL(appUrl string) string { + return s.publicUrl +} + +func (s *Server) AgentURL(agentId string) string { + return fmt.Sprintf("http://127.0.0.1:%d/%s", s.port, agentId) +} + +func (s *Server) HealthCheck(devModeUrl string) error { + started := time.Now() + var i int + for time.Since(started) < 15*time.Second { + i++ + s.logger.Trace("health check request: %s", fmt.Sprintf("%s/_health", devModeUrl)) + req, err := http.NewRequestWithContext(s.ctx, "GET", fmt.Sprintf("%s/_health", devModeUrl), nil) + if err != nil { + return fmt.Errorf("failed to create health check request: %w", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + s.logger.Trace("health check request failed: %s", err) + dur := time.Millisecond * 150 * time.Duration(math.Pow(float64(i), 2)) + time.Sleep(dur) + continue + } + s.logger.Trace("health check request returned status code: %d", resp.StatusCode) + if resp.StatusCode != http.StatusOK { + s.logger.Trace("health check returned status code: %d", resp.StatusCode) + dur := time.Millisecond * 150 * time.Duration(math.Pow(float64(i), 2)) + time.Sleep(dur) + continue + } + return nil + } + return fmt.Errorf("health check failed after %s", time.Since(started)) +} + +func (s *Server) Connect(ui *DevModeUI, tuiLogger logger.Logger) error { + s.logger = tuiLogger + if pl, ok := s.logger.(*PendingLogger); ok { + pl.drain(ui, s.logger) + } + s.pendingLogger = s.logger + msg := <-s.connected + close(s.connected) + if msg != "" { + return fmt.Errorf("%s", msg) + } + return nil +} + +func (s *Server) monitor() { + t := time.NewTicker(time.Minute * 10) + defer t.Stop() + for { + select { + case <-s.ctx.Done(): + return + case <-t.C: + if s.expiresAt != nil && time.Now().After(*s.expiresAt) { + s.logger.Debug("connection expired, reconnecting") + s.reconnect() + } + } + } +} + +func New(args ServerArgs) (*Server, error) { + id := cstr.NewHash(args.OrgId, args.UserId) + tracer := otel.Tracer("@agentuity/cli", trace.WithInstrumentationAttributes( + attribute.String("id", id), + attribute.String("@agentuity/orgId", args.OrgId), + attribute.String("@agentuity/userId", args.UserId), + attribute.Bool("@agentuity/devmode", true), + attribute.String("name", "@agentuity/cli"), + attribute.String("version", args.Version), + ), trace.WithInstrumentationVersion(args.Version)) + + pendingLogger := NewPendingLogger(args.LogLevel) + + ctx, cancel := context.WithCancel(args.Ctx) + + server := &Server{ + ID: id, + logger: pendingLogger, + ctx: ctx, + cancel: cancel, + apiurl: args.APIURL, + transportUrl: args.TransportURL, + apiKey: args.APIKey, + Project: args.Project, + orgId: args.OrgId, + userId: args.UserId, + tracer: tracer, + version: args.Version, + port: args.Port, + apiclient: util.NewAPIClient(context.Background(), pendingLogger, args.APIURL, args.APIKey), + pendingLogger: pendingLogger, + connected: make(chan string, 1), + serverAddr: args.ServerAddr, + } + + go server.connect(true) + go server.monitor() + + return server, nil +} diff --git a/internal/dev/tui.go b/internal/dev/tui.go new file mode 100644 index 00000000..4a529579 --- /dev/null +++ b/internal/dev/tui.go @@ -0,0 +1,502 @@ +package dev + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/agentuity/go-common/tui" + "github.com/charmbracelet/bubbles/key" + "github.com/charmbracelet/bubbles/list" + "github.com/charmbracelet/bubbles/spinner" + "github.com/charmbracelet/bubbles/viewport" + tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/lipgloss" + "golang.org/x/term" +) + +var ( + resumeKey = key.NewBinding(key.WithKeys("r"), key.WithHelp("r", "resume"), key.WithDisabled()) + pauseKey = key.NewBinding(key.WithKeys("p"), key.WithHelp("p", "pause")) + helpKey = key.NewBinding(key.WithKeys("h"), key.WithHelp("h", "show help")) + agentsKey = key.NewBinding(key.WithKeys("a"), key.WithHelp("a", "show agents")) + logoColor = lipgloss.AdaptiveColor{Light: "#11c7b9", Dark: "#00FFFF"} + labelColor = lipgloss.AdaptiveColor{Light: "#999999", Dark: "#FFFFFF"} + runningColor = lipgloss.AdaptiveColor{Light: "#00FF00", Dark: "#009900"} + pausedColor = lipgloss.AdaptiveColor{Light: "#FFA500", Dark: "#FFA500"} + statusColor = lipgloss.AdaptiveColor{Light: "#750075", Dark: "#FF5CFF"} + runningStyle = lipgloss.NewStyle().Foreground(runningColor) + pausedStyle = lipgloss.NewStyle().Foreground(pausedColor).AlignHorizontal(lipgloss.Center) + labelStyle = lipgloss.NewStyle().Foreground(labelColor).Bold(true) + statusMsgStyle = lipgloss.NewStyle().Foreground(statusColor).Margin(0) + viewPortHelpStyle = lipgloss.NewStyle().Foreground(lipgloss.AdaptiveColor{Light: "#000000", Dark: "#999999"}).Background(lipgloss.AdaptiveColor{Light: "#999999", Dark: "#222222"}).AlignHorizontal(lipgloss.Left).MarginTop(1) + statusMsgHeight = 2 +) + +type model struct { + infoBox string + statusMessage string + logList list.Model + logItems []list.Item + windowSize tea.WindowSizeMsg + viewport *viewport.Model + paused bool + showhelp bool + showagents bool + agents []*Agent + selectedLog *logItem + spinner spinner.Model + spinning bool + devModeUrl string + publicUrl string + appUrl string +} + +type spinnerStartMsg struct{} +type spinnerStopMsg struct{} + +type logItem string + +func (i logItem) Title() string { return strings.ReplaceAll(string(i), "\n", " ") } +func (i logItem) Description() string { return "" } +func (i logItem) FilterValue() string { return string(i) } + +type tickMsg time.Time +type addLogMsg string +type statusMessageMsg string + +func tick() tea.Cmd { + return tea.Tick(time.Second, func(t time.Time) tea.Msg { + return tickMsg(t) + }) +} + +func initialModel(config DevModeConfig) *model { + + width, height, err := term.GetSize(int(os.Stdout.Fd())) + if err != nil { + fmt.Println("Error getting terminal size:", err) + } + + spinner := spinner.New(spinner.WithSpinner(spinner.Dot), spinner.WithStyle(statusMsgStyle.MarginLeft(1).MarginRight(0))) + + items := []list.Item{} + + listDelegate := list.NewDefaultDelegate() + listDelegate.ShowDescription = false + listDelegate.SetSpacing(0) + listDelegate.Styles.NormalTitle = listDelegate.Styles.NormalTitle.Padding(0, 1) + listDelegate.Styles.SelectedTitle = listDelegate.Styles.SelectedTitle.BorderLeft(false).Foreground(labelColor).Bold(true) + + l := list.New(items, listDelegate, width-2, 10) + l.SetShowTitle(false) + l.SetShowStatusBar(false) + l.SetShowPagination(true) + l.SetFilteringEnabled(true) + l.SetShowHelp(true) + l.SetStatusBarItemName("log", "logs") + l.Styles.NoItems = l.Styles.NoItems.MarginLeft(1) + l.Styles.HelpStyle = l.Styles.HelpStyle.AlignHorizontal(lipgloss.Center).Width(width) + + l.AdditionalShortHelpKeys = func() []key.Binding { + return []key.Binding{ + resumeKey, + pauseKey, + helpKey, + agentsKey, + } + } + l.AdditionalFullHelpKeys = func() []key.Binding { + return []key.Binding{ + resumeKey, + pauseKey, + helpKey, + agentsKey, + } + } + + m := &model{ + logList: l, + logItems: items, + spinner: spinner, + windowSize: tea.WindowSizeMsg{Width: width, Height: height}, + devModeUrl: config.DevModeUrl, + publicUrl: config.PublicUrl, + appUrl: config.AppUrl, + agents: config.Agents, + } + + m.infoBox = m.generateInfoBox() + + infoBoxHeight := lipgloss.Height(m.infoBox) + available := height - infoBoxHeight - statusMsgHeight + if available < 1 { + available = 1 + } + m.logList.SetHeight(available) + + return m +} + +func (m *model) Init() tea.Cmd { + return tick() +} + +func label(s string) string { + return labelStyle.Render(tui.PadRight(s, 10, " ")) +} + +func (m *model) generateInfoBox() string { + var statusStyle = runningStyle + if m.paused { + statusStyle = pausedStyle + } + var devmodeBox = lipgloss.NewStyle(). + Width(m.windowSize.Width-2). + Border(lipgloss.NormalBorder()). + BorderForeground(logoColor). + Padding(1, 2). + AlignVertical(lipgloss.Top). + AlignHorizontal(lipgloss.Left). + Foreground(labelColor) + + url := "loading..." + if m.publicUrl != "" { + url = tui.Link("%s", m.publicUrl) + " " + tui.Muted("(only accessible while running)") + } + + content := fmt.Sprintf(`%s + +%s %s +%s %s +%s %s`, + tui.Bold("⨺ Agentuity DevMode")+" "+statusStyle.Render(tui.PadLeft("⏺", m.windowSize.Width-25, " ")), + label("Dashboard"), tui.Link("%s", m.appUrl), + label("Local"), tui.Link("%s", m.devModeUrl), + label("Public"), url, + ) + return devmodeBox.Render(content) +} + +func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + var cmd []tea.Cmd + + switch msg := msg.(type) { + case spinnerStartMsg: + m.spinning = true + break + case spinnerStopMsg: + m.spinning = false + break + case spinner.TickMsg: + sm, c := m.spinner.Update(msg) + m.spinner = sm + cmd = append(cmd, c) + break + case tea.KeyMsg: + if msg.Type == tea.KeyCtrlC { + cmd = append(cmd, tea.Quit) + break + } + if msg.Type == tea.KeyEscape { + if m.showhelp { + m.showhelp = false + return m, nil + } + if m.showagents { + m.showagents = false + m.viewport = nil + return m, nil + } + if m.selectedLog != nil { + m.selectedLog = nil + return m, nil + } + } + if msg.Type == tea.KeyEnter && m.selectedLog == nil { + if sel := m.logList.SelectedItem(); sel != nil { + if log, ok := sel.(logItem); ok { + m.selectedLog = &log + break + } + } + } + if msg.Type == tea.KeyRunes { + switch msg.String() { + case "p": + m.paused = true + resumeKey.SetEnabled(true) + pauseKey.SetEnabled(false) + case "r": + m.paused = false + resumeKey.SetEnabled(false) + pauseKey.SetEnabled(true) + case "h": + m.showhelp = true + case "a": + m.showagents = true + } + m.infoBox = m.generateInfoBox() + } + if m.viewport != nil { + vp, vpCmd := m.viewport.Update(msg) + m.viewport = &vp + cmd = append(cmd, vpCmd) + } + case tea.WindowSizeMsg: + m.windowSize = msg + // Calculate the height for the info box + infoBoxHeight := lipgloss.Height(m.infoBox) + available := msg.Height - infoBoxHeight - statusMsgHeight + if available < 1 { + available = 1 + } + m.logList.SetHeight(available) + m.logList.SetWidth(m.windowSize.Width - 2) + break + case tickMsg: + m.infoBox = m.generateInfoBox() + cmd = append(cmd, tick()) + break + case addLogMsg: + m.logItems = append([]list.Item{logItem(msg)}, m.logItems...) + if !m.paused { + if m.logList.FilterState() == list.Unfiltered { + m.logList.SetItems(m.logItems) + } + } + break + case statusMessageMsg: + m.statusMessage = string(msg) + break + } + + var lcmd tea.Cmd + m.logList, lcmd = m.logList.Update(msg) + cmd = append(cmd, lcmd) + return m, tea.Batch(cmd...) +} + +func (m *model) View() string { + + var showModal bool + var modalContent string + modalWidth := m.windowSize.Width / 2 + modalHeight := m.windowSize.Height / 2 + if modalWidth < 40 { + modalWidth = 40 + } + if modalHeight < 10 { + modalHeight = 10 + } + + if m.showhelp { + showModal = true + modalContent = lipgloss.JoinVertical( + lipgloss.Left, + tui.Bold("⨺ Agentuity DevMode"), + "", + tui.Secondary("When your project is running in DevMode, you can interact with it by sending messages to your local agents."), + "", "", + tui.Secondary("The best way to do this is to open the Agentuity console in your browser:"), + "", + tui.Link("%s", m.appUrl), + "", "", + tui.Secondary("You can also use curl or wget to send messages to the local agent."), + "", + tui.Secondary(fmt.Sprintf("To send a message to the local agent %s, use the following command:", m.agents[0].Name)), + "", "", + tui.Highlight(fmt.Sprintf("curl %s --json '{\"message\": \"Hello, world!\"}'", m.agents[0].LocalURL)), + "", + tui.Secondary(fmt.Sprintf("To send a message to the local agent %s from a remote machine, use the following command:", m.agents[0].Name)), + "", "", + tui.Highlight(fmt.Sprintf("curl %s --json '{\"message\": \"Hello, world!\"}'", m.agents[0].PublicURL)), + "", + tui.Muted("Note: The public URL is only accessible in devmode and has no authentication while in devmode. This this URL carefully."), + "", "", + tui.Warning("To get help or share your feedback, join our Discord community:"), + "", + tui.Link("https://discord.gg/vtn3hgUfuc"), + "", + ) + } else if m.selectedLog != nil { + showModal = true + modalContent = string(*m.selectedLog) + } else if m.showagents { + showModal = true + modalContent = "Agents" + var agentsContent string + modalWidth = int(float64(m.windowSize.Width) * 0.9) + for _, agent := range m.agents { + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("ID", 10, " "), tui.Muted(agent.ID)) + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("Agent", 10, " "), tui.Title(agent.Name)) + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("Local", 10, " "), tui.Link("%s", agent.LocalURL)) + agentsContent += fmt.Sprintf("%s %s\n", tui.PadRight("Public", 10, " "), tui.Link("%s", agent.PublicURL)) + agentsContent += "\n" + } + modalContent = agentsContent + } + + if showModal { + modal := lipgloss.NewStyle().Padding(2) + if m.viewport == nil { + vp := viewport.New(m.windowSize.Width, m.windowSize.Height-1) + vp.SetYOffset(1) + m.viewport = &vp + } + m.viewport.SetContent(modal.Render(modalContent)) + m.viewport.Width = m.windowSize.Width + esc := "ESC to close" + pct := fmt.Sprintf("%3.f%%", m.viewport.ScrollPercent()*100) + spacer := m.windowSize.Width - lipgloss.Width(esc) - lipgloss.Width(pct) + 3 + right := lipgloss.NewStyle().AlignHorizontal(lipgloss.Right).Width(spacer).Render(pct) + return m.viewport.View() + "\n" + viewPortHelpStyle.Width(m.windowSize.Width).Render(lipgloss.JoinHorizontal(lipgloss.Left, esc, right)) + } + + var view string + + if m.spinning { + view = m.spinner.View() + " " + } else { + view = " " + } + + return fmt.Sprintf("%s\n%s\n%s", m.infoBox, view+statusMsgStyle.Render(m.statusMessage), m.logList.View()) +} + +type Agent struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + LocalURL string `json:"local_url,omitempty"` + PublicURL string `json:"public_url,omitempty"` +} + +type DevModeUI struct { + ctx context.Context + cancel context.CancelFunc + model *model + program *tea.Program + wg sync.WaitGroup + once sync.Once + + spinnerCtx context.Context + spinnerCancel context.CancelFunc + aborting bool +} + +type DevModeConfig struct { + DevModeUrl string + PublicUrl string + AppUrl string + Agents []*Agent +} + +func NewDevModeUI(ctx context.Context, config DevModeConfig) *DevModeUI { + ctx, cancel := context.WithCancel(ctx) + return &DevModeUI{ + ctx: ctx, + cancel: cancel, + model: initialModel(config), + } +} + +func (d *DevModeUI) SetPublicURL(url string) { + d.model.publicUrl = url +} + +// Done returns a channel that will be closed when the program is done +func (d *DevModeUI) Done() <-chan struct{} { + return d.ctx.Done() +} + +// Close the program which will stop the program and wait for it to exit +func (d *DevModeUI) Close(abort bool) { + d.once.Do(func() { + d.aborting = abort + d.program.Quit() + }) +} + +// Start the program +func (d *DevModeUI) Start() { + d.program = tea.NewProgram( + d.model, + tea.WithAltScreen(), + tea.WithoutSignalHandler(), + ) + d.wg.Add(1) + go func() { + defer func() { + d.cancel() + d.wg.Done() + if d.aborting { + for i := len(d.model.logItems) - 1; i >= 0; i-- { + fmt.Println(d.model.logItems[i]) + } + } + }() + _, err := d.program.Run() + if err != nil { + fmt.Printf("Error running program: %v\n", err) + } + }() +} + +// Add a log message to the log list +func (d *DevModeUI) AddLog(log string, args ...any) { + d.program.Send(addLogMsg(fmt.Sprintf(log, args...))) +} + +func (d *DevModeUI) SetStatusMessage(msg string, args ...any) { + val := fmt.Sprintf(msg, args...) + d.program.Send(statusMessageMsg(val)) + if val != "" { + go func() { + select { + case <-time.After(time.Second * 3): + if val == d.model.statusMessage { + d.program.Send(statusMessageMsg("")) + } + case <-d.ctx.Done(): + return + } + }() + } +} + +func (d *DevModeUI) ShowSpinner(msg string, fn func()) { + d.SetSpinner(true) + d.SetStatusMessage("%s", msg) + fn() + d.SetStatusMessage("") + d.SetSpinner(false) +} + +func (d *DevModeUI) SetSpinner(spinning bool) { + if spinning { + d.program.Send(spinnerStartMsg{}) + ctx, cancel := context.WithCancel(d.ctx) + d.spinnerCtx = ctx + d.spinnerCancel = cancel + go func() { + t := time.NewTicker(time.Millisecond * 200) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + d.program.Send(d.model.spinner.Tick()) + } + } + }() + } else { + d.spinnerCancel() + d.spinnerCtx = nil + d.program.Send(spinnerStopMsg{}) + } +} diff --git a/internal/dev/tui_logger.go b/internal/dev/tui_logger.go new file mode 100644 index 00000000..f0f52eeb --- /dev/null +++ b/internal/dev/tui_logger.go @@ -0,0 +1,131 @@ +package dev + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "regexp" + "strings" + + "github.com/agentuity/go-common/logger" + "github.com/agentuity/go-common/tui" +) + +type TuiLogger struct { + logLevel logger.LogLevel + ui *DevModeUI +} + +func NewTUILogger(logLevel logger.LogLevel, ui *DevModeUI) *TuiLogger { + return &TuiLogger{logLevel: logLevel, ui: ui} +} + +var _ logger.Logger = (*TuiLogger)(nil) +var _ io.Writer = (*TuiLogger)(nil) + +// With will return a new logger using metadata as the base context +func (l *TuiLogger) With(metadata map[string]interface{}) logger.Logger { + return l +} + +// WithPrefix will return a new logger with a prefix prepended to the message +func (l *TuiLogger) WithPrefix(prefix string) logger.Logger { + return l +} + +// WithContext will return a new logger with the given context +func (l *TuiLogger) WithContext(ctx context.Context) logger.Logger { + return l +} + +// Trace level logging +func (l *TuiLogger) Trace(msg string, args ...interface{}) { + if logger.LevelTrace < l.logLevel { + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Debug level logging +func (l *TuiLogger) Debug(msg string, args ...interface{}) { + if logger.LevelDebug < l.logLevel { + return + } + val := tui.Muted("[TRACE] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Info level loggi ng +func (l *TuiLogger) Info(msg string, args ...interface{}) { + if logger.LevelInfo < l.logLevel { + return + } + val := tui.Text("[INFO] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Warning level logging +func (l *TuiLogger) Warn(msg string, args ...interface{}) { + if logger.LevelWarn < l.logLevel { + return + } + val := tui.Title("[WARN] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Error level logging +func (l *TuiLogger) Error(msg string, args ...interface{}) { + if logger.LevelError < l.logLevel { + return + } + val := tui.Bold("[ERROR] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) +} + +// Fatal level logging and exit with code 1 +func (l *TuiLogger) Fatal(msg string, args ...interface{}) { + val := tui.Bold("[FATAL] " + fmt.Sprintf(msg, args...)) + l.ui.AddLog("%s", val) + os.Exit(1) +} + +// Stack will return a new logger that logs to the given logger as well as the current logger +func (l *TuiLogger) Stack(next logger.Logger) logger.Logger { + return l +} + +var eol = []byte("\n") +var ansiColorStripper = regexp.MustCompile("\x1b\\[[0-9;]*[mK]") + +func (l *TuiLogger) Write(p []byte) (n int, err error) { + trimmed := bytes.Split(p, eol) + for _, line := range trimmed { + if len(line) == 0 { + continue + } + log := string(line) + if len(log) > 20 { + prefix := ansiColorStripper.ReplaceAllString(log[:20], "") + if logger.LevelTrace < l.logLevel && strings.HasPrefix(prefix, "[TRACE]") { + continue + } + if logger.LevelDebug < l.logLevel && strings.HasPrefix(prefix, "[DEBUG]") { + continue + } + if logger.LevelInfo < l.logLevel && strings.HasPrefix(prefix, "[INFO]") { + continue + } + if logger.LevelWarn < l.logLevel && strings.HasPrefix(prefix, "[WARN]") { + continue + } + if logger.LevelError < l.logLevel && strings.HasPrefix(prefix, "[ERROR]") { + continue + } + } + l.ui.AddLog("%s", log) + } + return len(p), nil +} diff --git a/internal/dev/websocket.go b/internal/dev/websocket.go deleted file mode 100644 index 5d739211..00000000 --- a/internal/dev/websocket.go +++ /dev/null @@ -1,670 +0,0 @@ -package dev - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net" - "net/http" - "net/url" - "strings" - "time" - - "github.com/agentuity/cli/internal/errsystem" - "github.com/agentuity/cli/internal/project" - "github.com/agentuity/go-common/logger" - "github.com/agentuity/go-common/telemetry" - "github.com/cenkalti/backoff/v4" - "github.com/google/uuid" - "github.com/gorilla/websocket" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/trace" -) - -var propagator propagation.TraceContext - -type Websocket struct { - webSocketId string - conn *websocket.Conn - OtelToken string - OtelUrl string - Project project.ProjectContext - orgId string - done chan struct{} - apiKey string - websocketUrl string - maxRetries int - retryCount int - parentCtx context.Context - ctx context.Context - logger logger.Logger - cleanup func() - tracer trace.Tracer - version string - binaryProtocol bool -} - -type OutputPayload struct { - ContentType string `json:"contentType"` - Payload []byte `json:"payload"` - Trigger string `json:"trigger"` -} - -func isOutputPayload(message []byte) (*OutputPayload, error) { - var op OutputPayload - if err := json.Unmarshal(message, &op); err != nil { - return nil, err - } - return &op, nil -} - -func isContextCanceled(ctx context.Context, err error) bool { - if errors.Is(err, context.Canceled) { - return true - } - select { - case <-ctx.Done(): - return true - default: - return false - } -} - -func (c *Websocket) Done() <-chan struct{} { - return c.done -} - -const maxHealthCheckDuration = time.Second * 30 - -func isConnectionErrorRetryable(err error) bool { - if strings.Contains(err.Error(), "connection refused") { - return true - } - if strings.Contains(err.Error(), "connection reset by peer") { - return true - } - if strings.Contains(err.Error(), "No connection could be made because the target machine actively refused it") { // windows - return true - } - return false -} - -func (c *Websocket) getAgentProtocol(ctx context.Context, port int) (bool, error) { - url := fmt.Sprintf("http://127.0.0.1:%d/_health", port) - started := time.Now() - var i int - for time.Since(started) < maxHealthCheckDuration { - i++ - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return false, err - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - c.logger.Debug("healthcheck attempt #%d failed: %s", i, err) - if isConnectionErrorRetryable(err) { - time.Sleep(time.Millisecond * time.Duration(100*i+1)) - continue - } - return false, err - } - c.logger.Debug("healthcheck attempt #%d succeeded with status code %d", i, resp.StatusCode) - defer resp.Body.Close() - if resp.StatusCode == 200 { - return resp.Header.Get("x-agentuity-binary") == "true", nil - } - } - return false, fmt.Errorf("failed to inspect agents after %s", maxHealthCheckDuration) -} - -func (c *Websocket) getAgentWelcome(ctx context.Context, port int) (map[string]Welcome, error) { - url := fmt.Sprintf("http://127.0.0.1:%d/welcome", port) - for i := 0; i < 5; i++ { - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - if isConnectionErrorRetryable(err) { - time.Sleep(time.Millisecond * time.Duration(100*i+1)) - continue - } - return nil, err - } - defer resp.Body.Close() - if resp.StatusCode == 404 { - return nil, nil // this is ok, just means no agents have inspect - } - res := make(map[string]Welcome) - if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { - return nil, err - } - return res, nil - } - return nil, fmt.Errorf("failed to inspect agents after 5 attempts") -} - -func (c *Websocket) StartReadingMessages(ctx context.Context, logger logger.Logger, port int) { - var err error - c.binaryProtocol, err = c.getAgentProtocol(ctx, port) - if err != nil { - logger.Fatal("Your project failed to start. This typically happens when the project cannot compile or this is an underlying issue with starting the project.") - return - } - - go func() { - defer close(c.done) - for c.conn != nil { - _, m, err := c.conn.ReadMessage() - if err != nil { - if isContextCanceled(ctx, err) { - logger.Debug("shutdown in progress, exiting") - return - } - if errors.Is(err, websocket.ErrCloseSent) || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { - logger.Debug("connection closed") - if c.retryCount < c.maxRetries { - logger.Info("attempting to reconnect, retry %d of %d", c.retryCount+1, c.maxRetries) - if err := c.connect(logger, true); err != nil { - logger.Error("failed to reconnect: %s", err) - c.retryCount++ - continue - } - c.retryCount = 0 - continue - } - return - } - logger.Error("failed to read message: %s", err) - if c.retryCount < c.maxRetries { - logger.Info("attempting to reconnect, retry %d of %d", c.retryCount+1, c.maxRetries) - if err := c.connect(logger, true); err != nil { - logger.Error("failed to reconnect: %s", err) - c.retryCount++ - continue - } - c.retryCount = 0 - continue - } - return - } - // Reset retry count on successful message - c.retryCount = 0 - - logger.Trace("recv: %s", string(m)) - - var message Message - if err := json.Unmarshal(m, &message); err != nil { - logger.Error("failed to unmarshal agent message: %s", err) - return - } - - if message.Type == "input" { - var inputMsg InputMessage - if err := json.Unmarshal(m, &inputMsg); err != nil { - logger.Error("failed to unmarshal agent message: %s", err) - return - } - processInputMessage(logger, c, m, port) - } - if message.Type == "getAgents" { - agents := make([]Agent, 0) - for _, agent := range c.Project.Project.Agents { - agents = append(agents, Agent{ - Name: agent.Name, - ID: agent.ID, - Description: agent.Description, - }) - } - welcome, err := c.getAgentWelcome(ctx, port) - if err != nil { - logger.Error("failed to inspect agents: %s", err) - continue - } - for i, agent := range agents { - if val, ok := welcome[agent.ID]; ok { - agents[i].Welcome = &val - } - } - agentsMessage := NewAgentsMessage(c.webSocketId, c.Project.Project.ProjectId, AgentsPayload{ - ProjectID: c.Project.Project.ProjectId, - ProjectName: c.Project.Project.Name, - Agents: agents, - }) - - c.SendMessage(logger, agentsMessage) - } - } - }() -} - -func (c *Websocket) connect(logger logger.Logger, close bool) error { - if close { - // Close existing connection if it exists - if c.cleanup != nil { - c.cleanup() - } - if c.conn != nil { - c.conn.Close() - } - } - - u, err := url.Parse(c.websocketUrl) - if err != nil { - return fmt.Errorf("failed to parse url: %s", err) - } - u.Path = fmt.Sprintf("/websocket/devmode/%s", c.webSocketId) - u.RawQuery = url.Values{ - "from": []string{"cli"}, - "projectId": []string{c.Project.Project.ProjectId}, - }.Encode() - - if u.Scheme == "http" { - u.Scheme = "ws" - } else if u.Scheme == "https" { - u.Scheme = "wss" - } - - urlString := u.String() - headers := http.Header{} - headers.Set("Authorization", fmt.Sprintf("Bearer %s", c.apiKey)) - - // connect to the websocket - logger.Trace("connecting to %s", urlString) - var httpResponse *http.Response - c.conn, httpResponse, err = websocket.DefaultDialer.Dial(urlString, headers) - if err != nil { - if httpResponse != nil { - if httpResponse.StatusCode == 401 { - logger.Error("invalid api key") - } - } - return fmt.Errorf("failed to dial: %s", err) - } - - // get the otel token and url from the headers - c.OtelToken = httpResponse.Header.Get("X-AGENTUITY-OTLP-BEARER-TOKEN") - if c.OtelToken == "" { - return errsystem.New(errsystem.ErrAuthenticateOtelServer, nil, errsystem.WithUserMessage("Failed to authenticate with otel server")) - } - c.OtelUrl = httpResponse.Header.Get("X-AGENTUITY-OTLP-URL") - if c.OtelUrl == "" { - return errsystem.New(errsystem.ErrAuthenticateOtelServer, nil, errsystem.WithUserMessage("Failed to get otel server url")) - } - - c.ctx, c.logger, c.cleanup, err = telemetry.NewWithAPIKey(c.parentCtx, "@agentuity/cli", c.OtelUrl, c.OtelToken, logger) - if err != nil { - return fmt.Errorf("failed to create OTLP telemetry trace: %w", err) - } - - logger.Debug("successfully connected") - return nil -} - -type WebsocketArgs struct { - Ctx context.Context - Logger logger.Logger - WebsocketId string - WebsocketUrl string - APIKey string - Project project.ProjectContext - OrgId string - Version string -} - -func NewWebsocket(args WebsocketArgs) (*Websocket, error) { - tracer := otel.Tracer("@agentuity/cli", trace.WithInstrumentationVersion(args.Version)) - ws := Websocket{ - parentCtx: args.Ctx, - webSocketId: args.WebsocketId, - Project: args.Project, - done: make(chan struct{}), - apiKey: args.APIKey, - websocketUrl: args.WebsocketUrl, - maxRetries: 5, - retryCount: 0, - tracer: tracer, - orgId: args.OrgId, - version: args.Version, - } - u, err := url.Parse(args.WebsocketUrl) - if err != nil { - return nil, fmt.Errorf("failed to parse url: %s", err) - } - u.Path = fmt.Sprintf("/websocket/devmode/%s", args.WebsocketId) - u.RawQuery = url.Values{ - "from": []string{"cli"}, - }.Encode() - - if u.Scheme == "http" { - u.Scheme = "ws" - } else if u.Scheme == "https" { - u.Scheme = "wss" - } - - if err := ws.connect(args.Logger, false); err != nil { - return nil, err - } - - return &ws, nil -} - -// Update SendMessage to accept the MessageType interface -func (c *Websocket) SendMessage(logger logger.Logger, msg Message) error { - buf, err := json.Marshal(msg) - if err != nil { - return err - } - if c.conn == nil { - return nil - } - if err := c.conn.WriteMessage(websocket.TextMessage, buf); err != nil { - return err - } - return nil -} - -func (c *Websocket) Close() error { - c.SendMessage(c.logger, NewCloseMessage(uuid.New().String(), c.Project.Project.ProjectId)) - if c.conn != nil { - closeHandler := c.conn.CloseHandler() - if err := closeHandler(1000, "User requested shutdown"); err != nil { - c.logger.Error("failed to close connection: %s", err) - return err - } - } - if c.cleanup != nil { - c.cleanup() - c.cleanup = nil - } - return nil -} - -func (c *Websocket) WebURL(appUrl string) string { - return fmt.Sprintf("%s/devmode/%s", appUrl, c.webSocketId) -} - -type Message struct { - ID string `json:"id"` - Type string `json:"type"` - Payload map[string]any `json:"payload"` - ProjectId string `json:"projectId"` -} - -// messages send by server to CLI -type InputMessage struct { - ID string `json:"id"` - Type string `json:"type"` - From string `json:"from"` - Payload struct { - SessionID string `json:"sessionId"` - Trigger string `json:"trigger"` - AgentID string `json:"agentId"` - ContentType string `json:"contentType"` - Payload []byte `json:"payload"` - } `json:"payload"` -} - -// messages send by CLI to the server -func NewOutputMessage(id string, projectId string, payload struct { - ContentType string `json:"contentType"` - Payload []byte `json:"payload"` - Trigger string `json:"trigger"` -}) Message { - payloadMap := map[string]any{ - "contentType": payload.ContentType, - "payload": payload.Payload, - "trigger": payload.Trigger, - } - return Message{ - ID: id, - Type: "output", - Payload: payloadMap, - ProjectId: projectId, - } - -} - -func NewCloseMessage(id string, projectId string) Message { - payloadMap := map[string]any{} - return Message{ - ID: id, - Type: "close", - Payload: payloadMap, - ProjectId: projectId, - } -} - -type Welcome struct { - Message string `json:"welcome"` - Prompts []struct { - Data string `json:"data"` - ContentType string `json:"contentType"` - } `json:"prompts,omitempty"` -} - -type Agent struct { - Name string `json:"name"` - ID string `json:"id"` - Description string `json:"description"` - Welcome *Welcome `json:"welcome,omitempty"` -} - -type AgentsPayload struct { - Agents []Agent `json:"agents"` - ProjectID string `json:"projectId"` - ProjectName string `json:"projectName"` -} - -func NewAgentsMessage(id string, projectId string, payload AgentsPayload) Message { - payloadMap := map[string]any{ - "agents": payload.Agents, - "projectId": projectId, - "projectName": payload.ProjectName, - } - - return Message{ - ID: id, - Type: "agents", - ProjectId: projectId, - Payload: payloadMap, - } -} - -func processInputMessage(plogger logger.Logger, c *Websocket, m []byte, port int) { - started := time.Now() - ctx, logger, span := telemetry.StartSpan(c.ctx, plogger, c.tracer, "TriggerRun", - trace.WithAttributes( - attribute.String("@agentuity/devmode", "true"), - attribute.String("trigger", "manual"), - attribute.String("@agentuity/deploymentId", c.webSocketId), - ), - trace.WithSpanKind(trace.SpanKindConsumer), - ) - defer span.End() - - var inputMsg InputMessage - var outputMessage *Message - var err error - - defer func() { - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - msg := NewOutputMessage(inputMsg.ID, c.Project.Project.ProjectId, OutputPayload{ - ContentType: "text/plain", - Payload: []byte(err.Error()), - Trigger: "", - }) - outputMessage = &msg - } else { - span.SetStatus(codes.Ok, "") - } - if outputMessage != nil { - c.SendMessage(plogger, *outputMessage) - } - span.SetAttributes( - attribute.Int64("@agentuity/cpu_time", time.Since(started).Milliseconds()), - ) - plogger.Info("processed sess_%s in %s", span.SpanContext().TraceID(), time.Since(started)) - }() - - if lerr := json.Unmarshal(m, &inputMsg); lerr != nil { - logger.Error("failed to unmarshal agent message: %s", lerr) - err = lerr - return - } - - span.SetAttributes( - attribute.String("@agentuity/agentId", inputMsg.Payload.AgentID), - attribute.String("@agentuity/orgId", c.orgId), - attribute.String("@agentuity/projectId", c.Project.Project.ProjectId), - attribute.String("@agentuity/env", "development"), - ) - - spanContext := span.SpanContext() - traceState := spanContext.TraceState() - traceState, err = traceState.Insert("id", inputMsg.Payload.AgentID) - if err != nil { - logger.Error("failed to insert agent id into trace state: %s", err) - err = fmt.Errorf("failed to insert agent id into trace state: %w", err) - return - } - traceState, err = traceState.Insert("oid", c.orgId) - if err != nil { - logger.Error("failed to insert org id into trace state: %s", err) - err = fmt.Errorf("failed to insert org id into trace state: %w", err) - return - } - traceState, err = traceState.Insert("pid", c.Project.Project.ProjectId) - if err != nil { - logger.Error("failed to insert project id into trace state: %s", err) - err = fmt.Errorf("failed to insert project id into trace state: %w", err) - return - } - ctx = trace.ContextWithSpanContext(ctx, spanContext.WithTraceState(traceState)) - - c.Project.Logger.Debug("input message: %+v", inputMsg) - - if c.Project.Project.Development == nil { - logger.Error("development is not enabled for this project") - err = errors.New("development is not enabled for this project") - return - } - - var inputPayload []byte - url := fmt.Sprintf("http://127.0.0.1:%d/%s", port, inputMsg.Payload.AgentID) - contentType := "application/json" - - if !c.binaryProtocol { - // TODO: remove this once were all off the old protocol - // make a json object with the payload - payload := map[string]any{ - "contentType": inputMsg.Payload.ContentType, - "payload": inputMsg.Payload.Payload, - "trigger": "manual", - } - - var lerr error - inputPayload, lerr = json.Marshal(payload) - if lerr != nil { - logger.Error("failed to marshal payload: %s", lerr) - err = fmt.Errorf("failed to marshal payload: %w", lerr) - return - } - } else { - contentType = inputMsg.Payload.ContentType - inputPayload = inputMsg.Payload.Payload - } - logger.Debug("sending payload: %s to %s", string(inputPayload), url) - - req, lerr := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(inputPayload)) - if lerr != nil { - logger.Error("failed to create request: %s", lerr) - err = fmt.Errorf("failed to create HTTP request: %w", lerr) - return - } - req.Header.Set("Content-Type", contentType) - req.Header.Set("User-Agent", "Agentuity CLI/"+c.version) - propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)) - - logger.Debug("sending request to %s with trace id: %s", url, spanContext.TraceID()) - - expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = 500 * time.Millisecond - expBackoff.MaxInterval = 5 * time.Second - expBackoff.MaxElapsedTime = 30 * time.Second // Max total time as requested - expBackoff.Multiplier = 2.0 - expBackoff.RandomizationFactor = 0.3 // Add jitter - - var resp *http.Response - operation := func() error { - var err error - resp, err = http.DefaultClient.Do(req) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Timeout() { - logger.Warn("connection timeout to agent, retrying...") - return err - } - if strings.Contains(err.Error(), "connection refused") { - logger.Warn("connection refused to agent, retrying...") - return err - } - logger.Error("failed to post to agent: %s", err) - return backoff.Permanent(err) - } - return nil - } - - err = backoff.Retry(operation, expBackoff) - if err != nil { - logger.Error("all attempts to post to agent failed: %s", err) - err = fmt.Errorf("failed to post to agent: %w", err) - return - } - defer resp.Body.Close() - - body, lerr := io.ReadAll(resp.Body) - if lerr != nil { - logger.Error("failed to read response body: %s", lerr) - err = fmt.Errorf("failed to read response body: %w", lerr) - return - } - - if resp.StatusCode != 200 { - err = fmt.Errorf("the Agent produced an error: %s", string(body)) - return - } - - logger.Debug("response: %s (status code: %d)", string(body), resp.StatusCode) - - var trigger string - if c.binaryProtocol { - trigger = resp.Header.Get("x-agentuity-trigger") - contentType = resp.Header.Get("content-type") - } else { - // TODO: remove this once were all off the old protocol - output, lerr := isOutputPayload(body) - if lerr != nil { - err = fmt.Errorf("the Agent produced an error") - return - } - trigger = output.Trigger - contentType = output.ContentType - body = output.Payload - } - - msg := NewOutputMessage(inputMsg.ID, c.Project.Project.ProjectId, OutputPayload{ - ContentType: contentType, - Payload: body, - Trigger: trigger, - }) - outputMessage = &msg -}