diff --git a/api/cluster_manager.go b/api/cluster_manager.go index a0af9ffd4..4de473d6c 100644 --- a/api/cluster_manager.go +++ b/api/cluster_manager.go @@ -200,6 +200,13 @@ func clusterManagerPut(state state.State, r *http.Request) response.Response { } } + if args.ReverseTunnel != nil { + err = database.StoreClusterManagerConfig(state, r.Context(), name, database.ReverseTunnelKey, *args.ReverseTunnel) + if err != nil { + return response.SmartError(err) + } + } + return response.SyncResponse(true, nil) } diff --git a/api/types/cluster_manager.go b/api/types/cluster_manager.go index f59972aee..b89326272 100644 --- a/api/types/cluster_manager.go +++ b/api/types/cluster_manager.go @@ -1,7 +1,10 @@ package types import ( + "sync" "time" + + "github.com/gorilla/websocket" ) // ClusterManagersPost represents the cluster manager configuration when receiving a POST request in MicroCloud. @@ -55,6 +58,10 @@ type ClusterManagerPut struct { // Interval in seconds to send status messages to the cluster manager // Example: 60 UpdateInterval *string `json:"update_interval" yaml:"update_interval"` + + // Enables or disables the reverse tunnel to the cluster manager + // Example: true, false + ReverseTunnel *string `json:"reverse_tunnel" yaml:"reverse_tunnel"` } // StatusDistribution represents the distribution of items. @@ -85,3 +92,26 @@ type ClusterManagerJoin struct { ClusterCertificate string `json:"cluster_certificate" yaml:"cluster_certificate"` Token string `json:"token" yaml:"token"` } + +// ClusterManagerTunnel represents the tunnel connection the cluster manager. +type ClusterManagerTunnel struct { + Mu sync.RWMutex + WsConn *websocket.Conn +} + +// ClusterManagerTunnelRequest represents the request received through the tunnel. +type ClusterManagerTunnelRequest struct { + ID string `json:"id"` + Method string `json:"method"` + Path string `json:"path"` + Headers map[string]string `json:"headers"` + Body []byte `json:"body"` +} + +// ClusterManagerTunnelResponse represents the response sent through the tunnel. +type ClusterManagerTunnelResponse struct { + ID string `json:"id"` + Status int `json:"status"` + Headers map[string]string `json:"headers"` + Body []byte `json:"body"` +} diff --git a/client/cluster_manager_client.go b/client/cluster_manager_client.go index a39a89592..2991fde61 100644 --- a/client/cluster_manager_client.go +++ b/client/cluster_manager_client.go @@ -9,10 +9,12 @@ import ( "fmt" "io" "net/http" + "net/url" "strings" "github.com/canonical/lxd/shared" "github.com/canonical/lxd/shared/version" + "github.com/gorilla/websocket" "github.com/canonical/microcloud/microcloud/api/types" "github.com/canonical/microcloud/microcloud/database" @@ -83,6 +85,26 @@ func (c *ClusterManagerClient) Delete(clusterCert *shared.CertInfo) error { return err } +// ConnectTunnelWebsocket establishes a WebSocket connection to the cluster manager for reverse tunneling. +func (c *ClusterManagerClient) ConnectTunnelWebsocket(clusterCert *shared.CertInfo) (*websocket.Conn, error) { + tlsConfig, address, err := c.getTlsConfig(clusterCert) + if err != nil { + return nil, fmt.Errorf("Failed to get TLS config: %w", err) + } + + dialer := websocket.Dialer{ + TLSClientConfig: tlsConfig, + } + + u := url.URL{Scheme: "wss", Host: address, Path: "/1.0/remote-cluster/ws"} + conn, _, err := dialer.Dial(u.String(), nil) + if err != nil { + return nil, err + } + + return conn, nil +} + func (c *ClusterManagerClient) craftRequest(method string, path string, reqBody io.Reader) (*http.Request, error) { url := "https://remote" + path // remote is a placeholder, real address will be set in sendRequest req, err := http.NewRequest(method, url, reqBody) @@ -121,7 +143,19 @@ func (c *ClusterManagerClient) sendRequest(clusterCert *shared.CertInfo, req *ht func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*http.Client, string, error) { client := &http.Client{} + tlsConfig, address, err := c.getTlsConfig(clusterCert) + if err != nil { + return nil, "", fmt.Errorf("Failed to get TLS config: %w", err) + } + client.Transport = &http.Transport{ + TLSClientConfig: tlsConfig, + } + + return client, address, nil +} + +func (c *ClusterManagerClient) getTlsConfig(clusterCert *shared.CertInfo) (*tls.Config, string, error) { var address string var remoteCert *x509.Certificate var err error @@ -170,9 +204,5 @@ func (c *ClusterManagerClient) getHTTPClient(clusterCert *shared.CertInfo) (*htt return &cert, nil } - client.Transport = &http.Transport{ - TLSClientConfig: tlsConfig, - } - - return client, address, nil + return tlsConfig, address, nil } diff --git a/cmd/microcloud/cluster_manager.go b/cmd/microcloud/cluster_manager.go index 22b752489..a6d3607c1 100644 --- a/cmd/microcloud/cluster_manager.go +++ b/cmd/microcloud/cluster_manager.go @@ -239,6 +239,12 @@ func (c *cmdClusterManagerGet) run(_ *cobra.Command, args []string) error { fmt.Printf("%s\n", clusterManager.StatusLastErrorTime) case "status-last-error-response": fmt.Printf("%s\n", clusterManager.StatusLastErrorResponse) + case "reverse-tunnel": + value, ok := clusterManager.Config[database.ReverseTunnelKey] + if ok { + fmt.Printf("%s\n", value) + } + default: return errors.New("Invalid key") } @@ -258,7 +264,8 @@ func (c *cmdClusterManagerSet) command() *cobra.Command { cmd.Short = "Set specific cluster manager configuration key." cmd.Example = cli.FormatSection("", `microcloud cluster-manager set addresses example.com:8443 microcloud cluster-manager set certificate-fingerprint abababababababababababababababababababababababababababababababab -microcloud cluster-manager set update-interval-seconds 50`) +microcloud cluster-manager set update-interval-seconds 50 +microcloud cluster-manager set reverse-tunnel true`) cmd.RunE = c.run @@ -287,6 +294,12 @@ func (c *cmdClusterManagerSet) run(_ *cobra.Command, args []string) error { payload.CertificateFingerprint = &value case "update-interval-seconds": payload.UpdateInterval = &value + case "reverse-tunnel": + if value != "true" && value != "false" { + return errors.New("Invalid value for reverse-tunnel, expected 'true' or 'false'") + } + + payload.ReverseTunnel = &value default: return errors.New("Invalid key") } diff --git a/cmd/microcloudd/cluster_manager_task.go b/cmd/microcloudd/cluster_manager_task.go index 529a3cd05..7a40fa8e2 100644 --- a/cmd/microcloudd/cluster_manager_task.go +++ b/cmd/microcloudd/cluster_manager_task.go @@ -1,11 +1,19 @@ package main import ( + "bytes" "context" + "crypto/tls" "fmt" + "io" + "net/http" + "os" + "os/signal" + "sync" "time" lxd "github.com/canonical/lxd/client" + "github.com/canonical/lxd/shared" "github.com/canonical/lxd/shared/api" "github.com/canonical/lxd/shared/logger" "github.com/canonical/microcluster/v2/state" @@ -19,13 +27,18 @@ import ( // SendClusterManagerStatusMessageTask starts a go routine, that sends periodic status messages to cluster manager. func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handler, s state.State) { go func(ctx context.Context, sh *service.Handler, s state.State) { + tunnel := &types.ClusterManagerTunnel{ + WsConn: nil, // This will be set when the websocket connection is established + Mu: sync.RWMutex{}, + } + ticker := time.NewTicker(database.UpdateIntervalDefaultSeconds * time.Second) defer ticker.Stop() for { select { case <-ticker.C: - newUpdateTime := sendClusterManagerStatusMessage(ctx, sh, s) + newUpdateTime := sendClusterManagerStatusMessage(ctx, sh, s, tunnel) if newUpdateTime > 0 { ticker.Reset(newUpdateTime) } @@ -37,7 +50,7 @@ func SendClusterManagerStatusMessageTask(ctx context.Context, sh *service.Handle }(ctx, sh, s) } -func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s state.State) time.Duration { +func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s state.State, tunnel *types.ClusterManagerTunnel) time.Duration { logger.Debug("Starting sendClusterManagerStatusMessage") var nextUpdate time.Duration = 0 @@ -64,6 +77,7 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s return nextUpdate } + hasReverseTunnel := false for _, config := range clusterManagerConfig { if config.Key == database.UpdateIntervalSecondsKey { interval, err := time.ParseDuration(config.Value + "s") @@ -73,7 +87,10 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s } nextUpdate = interval - break + } + + if config.Key == database.ReverseTunnelKey { + hasReverseTunnel = config.Value == "true" } } @@ -94,6 +111,8 @@ func sendClusterManagerStatusMessage(ctx context.Context, sh *service.Handler, s return nextUpdate } + ensureTunnel(ctx, sh, s, tunnel, hasReverseTunnel) + payload := types.ClusterManagerPostStatus{} lxdService := sh.Services[types.LXD].(*service.LXDService) @@ -233,3 +252,165 @@ func enrichClusterMemberMetrics(lxdClient lxd.InstanceServer, result *types.Clus return nil } + +func ensureTunnel(ctx context.Context, sh *service.Handler, s state.State, tunnel *types.ClusterManagerTunnel, hasTunnel bool) { + if hasTunnel && tunnel.WsConn != nil { + logger.Debug("Websocket already connected, skipping reconnection") + return + } + + if hasTunnel && tunnel.WsConn == nil { + logger.Debug("Websocket not connected, establishing connection in new goroutine") + go openTunnel(ctx, sh, s, tunnel) + return + } + + if !hasTunnel && tunnel.WsConn != nil { + logger.Debug("Websocket connected but reverse tunnel is disabled, closing connection") + tunnel.Mu.Lock() + if tunnel.WsConn != nil { + tunnel.WsConn.Close() + tunnel.WsConn = nil + } + + tunnel.Mu.Unlock() + return + } + + logger.Debug("Reverse tunnel is disabled, not opening websocket connection") +} + +func openTunnel(ctx context.Context, sh *service.Handler, s state.State, tunnel *types.ClusterManagerTunnel) { + logger.Error("Connecting ws") + + clusterManager, _, err := database.LoadClusterManager(s, ctx, database.ClusterManagerDefaultName) + if err != nil { + logger.Error("Failed to load cluster manager config", logger.Ctx{"err": err}) + return + } + + cloud := sh.Services[types.MicroCloud].(*service.CloudService) + clusterCert, err := cloud.ClusterCert() + if err != nil { + logger.Error("Failed to get cluster certificate", logger.Ctx{"err": err}) + return + } + + clusterManagerClient := client.NewClusterManagerClient(clusterManager) + conn, err := clusterManagerClient.ConnectTunnelWebsocket(clusterCert) + if err != nil { + logger.Error("Failed to connect to cluster manager websocket", logger.Ctx{"err": err}) + return + } + + tunnel.Mu.Lock() + tunnel.WsConn = conn + tunnel.Mu.Unlock() + + defer conn.Close() + + // Handle CTRL+C to gracefully close + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + done := make(chan struct{}) + + logger.Error("Connected to cluster manager websocket") + + defer close(done) + for { + var req types.ClusterManagerTunnelRequest + err = conn.ReadJSON(&req) + if err != nil { + logger.Error("Read error:", logger.Ctx{"err": err}) + tunnel.Mu.Lock() + tunnel.WsConn = nil + tunnel.Mu.Unlock() + return + } + + logger.Error("Received request:", logger.Ctx{"path": req.Path}) + resp := handleTunnelRequest(req, sh) + + // Send back the response + err = conn.WriteJSON(resp) + if err != nil { + logger.Error("Write error:", logger.Ctx{"err": err}) + tunnel.Mu.Lock() + tunnel.WsConn = nil + tunnel.Mu.Unlock() + return + } + } +} + +func handleTunnelRequest(req types.ClusterManagerTunnelRequest, sh *service.Handler) types.ClusterManagerTunnelResponse { + // todo use the lxd service instead of raw connection + reqUrl := "https://localhost:8443" + req.Path + httpReq, err := http.NewRequest(req.Method, reqUrl, bytes.NewReader(req.Body)) + if err != nil { + logger.Error("Request build error", logger.Ctx{"err": err, "path": req.Path, "method": req.Method}) + return types.ClusterManagerTunnelResponse{ID: req.ID, Status: http.StatusInternalServerError} + } + + // Copy headers + for k, v := range req.Headers { + httpReq.Header.Set(k, v) + } + + tlsConfig := shared.InitTLSConfig() + tlsConfig.InsecureSkipVerify = true // todo For testing purposes, skip verification of the server's certificate + + cloud := sh.Services[types.MicroCloud].(*service.CloudService) + clusterCert, err := cloud.ClusterCert() + if err != nil { + logger.Error("Failed to get cluster certificate", logger.Ctx{"err": err}) + return types.ClusterManagerTunnelResponse{ID: req.ID, Status: http.StatusInternalServerError} + } + + // todo we are using the cluster certificate to authenticate the request. This must be avoided and replaced with the client being authenticated instead. + cert := clusterCert.KeyPair() + tlsConfig.GetClientCertificate = func(info *tls.CertificateRequestInfo) (*tls.Certificate, error) { + // GetClientCertificate is called if not nil instead of performing the default selection of an appropriate + // certificate from the `Certificates` list. We only have one-key pair to send, and we always want to send it + // because this is what uniquely identifies the caller to the server. + return &cert, nil + } + + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + } + + // Send to internal HTTP handler + httpClient := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + // Return an error to prevent following redirects + return http.ErrUseLastResponse + }, + } + + httpClient.Transport = transport + httpResp, err := httpClient.Do(httpReq) + if err != nil { + logger.Error("Internal request failed: %v", logger.Ctx{"err": err, "reqUrl": reqUrl, "method": req.Method}) + return types.ClusterManagerTunnelResponse{ID: req.ID, Status: http.StatusBadGateway} + } + + defer httpResp.Body.Close() + + body, _ := io.ReadAll(httpResp.Body) + + headers := make(map[string]string) + for k, v := range httpResp.Header { + if len(v) > 0 { + headers[k] = v[0] + } + } + + return types.ClusterManagerTunnelResponse{ + ID: req.ID, + Status: httpResp.StatusCode, + Headers: headers, + Body: body, + } +} diff --git a/database/cluster_manager_config.go b/database/cluster_manager_config.go index 9dbdeec04..ef9e6e0fc 100644 --- a/database/cluster_manager_config.go +++ b/database/cluster_manager_config.go @@ -16,6 +16,9 @@ const ClusterManagerDefaultName = "default" // UpdateIntervalSecondsKey is the key for the update interval configuration. const UpdateIntervalSecondsKey = "update-interval-seconds" +// ReverseTunnelKey is the key for enabling or disabling the websocket in configuration. +const ReverseTunnelKey = "reverse-tunnel" + // UpdateIntervalDefaultSeconds is the interval for the status update task if none is defined in the database. const UpdateIntervalDefaultSeconds = 60