From 2f8eacb1a63d59894520f5d4b4531d85455b9c45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:15:38 +0100 Subject: [PATCH 01/22] microcluster/types: Add client Client and Connector interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Client is an actual client to a cluster member whereas Connector is a more general concept allowing to reach out to specific members, the cluster leader or multiple members of a cluster at once. Signed-off-by: Julian Pelizäus --- microcluster/types/client.go | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 microcluster/types/client.go diff --git a/microcluster/types/client.go b/microcluster/types/client.go new file mode 100644 index 00000000..c8eea1a8 --- /dev/null +++ b/microcluster/types/client.go @@ -0,0 +1,40 @@ +package types + +import ( + "context" + "crypto/x509" + "net/http" + "net/url" + + "github.com/gorilla/websocket" +) + +// Client represents a client allowing to communicate with a specific cluster member. +type Client interface { + // Generic functions for the client interface. + // They allow adding custom client functions. + URL() *url.URL + HTTP() *http.Client + Query(ctx context.Context, method string, prefix EndpointPrefix, path *url.URL, in any, out any) error + QueryRaw(ctx context.Context, method string, prefix EndpointPrefix, path *url.URL, in any) (*http.Response, error) + Websocket(ctx context.Context, prefix EndpointPrefix, path *url.URL) (*websocket.Conn, error) + + // Microcluster specific client functions. + SetClusterNotification() + UseTarget(name string) Client +} + +// Clients represents a list of clients allowing to communicate with multiple cluster members. +type Clients []Client + +// Connector represents various entry points to communicate with specific or multiple cluster members. +type Connector interface { + // Cluster returns a client to every cluster member according to dqlite. + Cluster(isNotification bool) (Clients, error) + + // Leader returns a client to the dqlite cluster leader. + Leader(isNotification bool) (Client, error) + + // Member returns a client to the specified member. + Member(url *url.URL, isNotification bool, cert *x509.Certificate) (Client, error) +} From b9f9ec437ba6ded796e25c8b11e14417ac24fe73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:21:49 +0100 Subject: [PATCH 02/22] microcluster/types: Move SelectRandom and Query to the respective package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Also switch the concurrent Query implementation to use errgroup instead of waitgroup. Signed-off-by: Julian Pelizäus --- client/cluster.go | 67 ------------------------------------ microcluster/types/client.go | 44 +++++++++++++++++++++++ 2 files changed, 44 insertions(+), 67 deletions(-) delete mode 100644 client/cluster.go diff --git a/client/cluster.go b/client/cluster.go deleted file mode 100644 index ec49a4d4..00000000 --- a/client/cluster.go +++ /dev/null @@ -1,67 +0,0 @@ -package client - -import ( - "context" - "fmt" - "math/rand" - "sync" -) - -// Cluster is a list of clients belonging to a cluster. -type Cluster []Client - -// SelectRandom returns a randomly selected client. -func (c Cluster) SelectRandom() (*Client, error) { - switch len(c) { - case 0: - // Returns an error if the cluster is uninitialized (not bootstrapped, not joined). - return nil, fmt.Errorf("Cluster is uninitialized or has no members") - case 1: - // Returns the only available client if cluster size is 1. - return &c[0], nil - default: - // Returns a randomly selected client for clusters with multiple members. - return &c[rand.Intn(len(c))], nil - } -} - -// Query executes the given hook across all members of the cluster. -func (c Cluster) Query(ctx context.Context, concurrent bool, query func(context.Context, *Client) error) error { - if !concurrent { - for _, client := range c { - err := query(ctx, &client) - if err != nil { - return err - } - } - - return nil - } - - errors := make([]error, 0, len(c)) - mut := sync.Mutex{} - wg := sync.WaitGroup{} - for _, client := range c { - wg.Add(1) - go func(client Client) { - defer wg.Done() - err := query(ctx, &client) - if err != nil { - mut.Lock() - errors = append(errors, err) - mut.Unlock() - return - } - }(client) - } - - // Wait for all queries to complete and check for any errors. - wg.Wait() - for _, err := range errors { - if err != nil { - return err - } - } - - return nil -} diff --git a/microcluster/types/client.go b/microcluster/types/client.go index c8eea1a8..b0bb0eec 100644 --- a/microcluster/types/client.go +++ b/microcluster/types/client.go @@ -3,10 +3,13 @@ package types import ( "context" "crypto/x509" + "fmt" + "math/rand" "net/http" "net/url" "github.com/gorilla/websocket" + "golang.org/x/sync/errgroup" ) // Client represents a client allowing to communicate with a specific cluster member. @@ -38,3 +41,44 @@ type Connector interface { // Member returns a client to the specified member. Member(url *url.URL, isNotification bool, cert *x509.Certificate) (Client, error) } + +// SelectRandom returns a randomly selected client. +func (c Clients) SelectRandom() (*Client, error) { + switch len(c) { + case 0: + // Returns an error if the cluster is uninitialized (not bootstrapped, not joined). + return nil, fmt.Errorf("Cluster is uninitialized or has no members") + case 1: + // Returns the only available client if cluster size is 1. + return &c[0], nil + default: + // Returns a randomly selected client for clusters with multiple members. + return &c[rand.Intn(len(c))], nil + } +} + +// Query executes the given hook across all members of the cluster. +func (c Clients) Query(ctx context.Context, concurrent bool, query func(context.Context, Client) error) error { + if !concurrent { + for _, client := range c { + err := query(ctx, client) + if err != nil { + return err + } + } + + return nil + } + + g, ctx := errgroup.WithContext(ctx) + + for _, client := range c { + g.Go(func() error { + return query(ctx, client) + }) + } + + // Wait for all queries to complete and check for any errors. + // The first observed error will be returned. + return g.Wait() +} From 78904b158295f0e6039ef5b35e430c62111db924 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:26:35 +0100 Subject: [PATCH 03/22] microcluster/types: Move IsNotification to the client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- client/client.go | 5 ----- internal/rest/client/client.go | 9 ++------- microcluster/types/client.go | 10 ++++++++++ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client/client.go b/client/client.go index c2c1f9ef..816ee84c 100644 --- a/client/client.go +++ b/client/client.go @@ -16,11 +16,6 @@ type Client struct { client.Client } -// IsNotification determines if this request is to be considered a cluster-wide notification. -func IsNotification(r *http.Request) bool { - return r.Header.Get("User-Agent") == client.UserAgentNotifier -} - // Query is a helper for initiating a request on any endpoints defined external to microcluster. This function should be used for all client // methods defined externally from microcluster. func (c *Client) Query(ctx context.Context, method string, prefix types.EndpointPrefix, path *url.URL, in any, out any) error { diff --git a/internal/rest/client/client.go b/internal/rest/client/client.go index 7d12b4e8..ff09699d 100644 --- a/internal/rest/client/client.go +++ b/internal/rest/client/client.go @@ -39,11 +39,6 @@ const ( CtxAccess CtxKey = "access" ) -// UserAgentNotifier is the user agent used for cluster wide notifications. -// It's using the "lxd-" prefix for backwards compatibility with older cluster members -// as originally the constant from LXD's client package was used. -const UserAgentNotifier = "lxd-cluster-notifier" - // New returns a new client configured with the given url and certificates. func New(url api.URL, clientCert *shared.CertInfo, remoteCert *x509.Certificate, forwarding bool) (*Client, error) { var err error @@ -179,14 +174,14 @@ func (c *Client) SetClusterNotification() { } func forwardingProxy(r *http.Request) (*url.URL, error) { - r.Header.Set("User-Agent", UserAgentNotifier) + r.Header.Set("User-Agent", types.UserAgentNotifier) return shared.ProxyFromEnvironment(r) } // IsForwardedRequest determines if this request has been forwarded from another cluster member. func IsForwardedRequest(r *http.Request) bool { - return r.Header.Get("User-Agent") == UserAgentNotifier + return r.Header.Get("User-Agent") == types.UserAgentNotifier } func (c *Client) rawQuery(ctx context.Context, method string, url *url.URL, data any) (*http.Response, error) { diff --git a/microcluster/types/client.go b/microcluster/types/client.go index b0bb0eec..3f6ca57b 100644 --- a/microcluster/types/client.go +++ b/microcluster/types/client.go @@ -57,6 +57,11 @@ func (c Clients) SelectRandom() (*Client, error) { } } +// UserAgentNotifier is the user agent used for cluster wide notifications. +// It's using the "lxd-" prefix for backwards compatibility with older cluster members +// as originally the constant from LXD's client package was used. +const UserAgentNotifier = "lxd-cluster-notifier" + // Query executes the given hook across all members of the cluster. func (c Clients) Query(ctx context.Context, concurrent bool, query func(context.Context, Client) error) error { if !concurrent { @@ -82,3 +87,8 @@ func (c Clients) Query(ctx context.Context, concurrent bool, query func(context. // The first observed error will be returned. return g.Wait() } + +// IsNotification determines if this request is to be considered a cluster-wide notification. +func IsNotification(r *http.Request) bool { + return r.Header.Get("User-Agent") == UserAgentNotifier +} From 342339616b79df061dc40c3f18ef06328f8d588d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:32:38 +0100 Subject: [PATCH 04/22] microcluster/types/addport: Drop SelectRandom as it's unused MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- microcluster/types/addrport.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/microcluster/types/addrport.go b/microcluster/types/addrport.go index f0e707da..debe3e00 100644 --- a/microcluster/types/addrport.go +++ b/microcluster/types/addrport.go @@ -2,7 +2,6 @@ package types import ( "encoding/json" - "math/rand" "net/netip" ) @@ -112,8 +111,3 @@ func (a AddrPorts) Strings() []string { return addrPortStrs } - -// SelectRandom returns a randomly selected AddrPort from AddrPorts. -func (a AddrPorts) SelectRandom() AddrPort { - return a[rand.Intn(len(a))] -} From c8e28e93109f01e646db1619fee0ba6162d5781d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:36:50 +0100 Subject: [PATCH 05/22] internal/rest/client: Move public facing API funcs to internal and implement Client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- client/client.go | 51 ---------------------------------- internal/rest/client/client.go | 33 ++++++++++++++++++---- 2 files changed, 28 insertions(+), 56 deletions(-) delete mode 100644 client/client.go diff --git a/client/client.go b/client/client.go deleted file mode 100644 index 816ee84c..00000000 --- a/client/client.go +++ /dev/null @@ -1,51 +0,0 @@ -package client - -import ( - "context" - "net/http" - "net/url" - - "github.com/gorilla/websocket" - - "github.com/canonical/microcluster/v3/internal/rest/client" - "github.com/canonical/microcluster/v3/microcluster/types" -) - -// Client is a rest client for the microcluster daemon. -type Client struct { - client.Client -} - -// Query is a helper for initiating a request on any endpoints defined external to microcluster. This function should be used for all client -// methods defined externally from microcluster. -func (c *Client) Query(ctx context.Context, method string, prefix types.EndpointPrefix, path *url.URL, in any, out any) error { - queryCtx, cancel := context.WithCancel(ctx) - defer cancel() - - return c.QueryStruct(queryCtx, method, prefix, path, in, &out) -} - -// QueryRaw is a helper for initiating a request on any endpoints defined external to microcluster. -// Unlike Query it returns the raw HTTP response. -func (c *Client) QueryRaw(ctx context.Context, method string, prefix types.EndpointPrefix, path *url.URL, in any) (*http.Response, error) { - queryCtx, cancel := context.WithCancel(ctx) - defer cancel() - - return c.QueryStructRaw(queryCtx, method, prefix, path, in) -} - -// Websocket is a helper for upgrading a request to websocket on any endpoints defined external to microcluster. -// This function should be used for all client methods defined externally from microcluster. -func (c *Client) Websocket(ctx context.Context, prefix types.EndpointPrefix, path *url.URL) (*websocket.Conn, error) { - websocketCtx, cancel := context.WithCancel(ctx) - defer cancel() - - return c.RawWebsocket(websocketCtx, prefix, path) -} - -// UseTarget returns a new client with the query "?target=name" set. -func (c *Client) UseTarget(name string) *Client { - newClient := c.Client.UseTarget(name) - - return &Client{Client: *newClient} -} diff --git a/internal/rest/client/client.go b/internal/rest/client/client.go index ff09699d..fa12727b 100644 --- a/internal/rest/client/client.go +++ b/internal/rest/client/client.go @@ -168,6 +168,11 @@ func tlsHTTPClient(clientCert *shared.CertInfo, remoteCert *x509.Certificate, pr return client, nil } +// HTTP returns the underlying HTTP client to allow direct modification. +func (c *Client) HTTP() *http.Client { + return c.Client +} + // SetClusterNotification sets the client's proxy to apply the forwarding headers to a request. func (c *Client) SetClusterNotification() { c.Transport.(*http.Transport).Proxy = forwardingProxy @@ -284,11 +289,11 @@ func (c *Client) mergeURL(endpointType types.EndpointPrefix, endpoint *url.URL) return localURL } -// QueryStruct sends a request of the specified method to the provided endpoint (optional) on the API matching the endpointType. +// Query sends a request of the specified method to the provided endpoint (optional) on the API matching the endpointType. // The response gets unpacked into the target struct. POST requests can optionally provide raw data to be sent through. // // The final URL is that provided as the endpoint combined with the applicable prefix for the endpointType and the scheme and host from the client. -func (c *Client) QueryStruct(ctx context.Context, method string, endpointType types.EndpointPrefix, endpoint *url.URL, data any, target any) error { +func (c *Client) Query(ctx context.Context, method string, endpointType types.EndpointPrefix, endpoint *url.URL, data any, target any) error { resp, err := c.QueryStructRaw(ctx, method, endpointType, endpoint, data) if err != nil { return err @@ -308,6 +313,24 @@ func (c *Client) QueryStruct(ctx context.Context, method string, endpointType ty return nil } +// QueryRaw is a helper for initiating a request on any endpoints defined external to microcluster. +// Unlike Query it returns the raw HTTP response. +func (c *Client) QueryRaw(ctx context.Context, method string, prefix types.EndpointPrefix, path *url.URL, in any) (*http.Response, error) { + queryCtx, cancel := context.WithCancel(ctx) + defer cancel() + + return c.QueryStructRaw(queryCtx, method, prefix, path, in) +} + +// Websocket is a helper for upgrading a request to websocket on any endpoints defined external to microcluster. +// This function should be used for all client methods defined externally from microcluster. +func (c *Client) Websocket(ctx context.Context, prefix types.EndpointPrefix, path *url.URL) (*websocket.Conn, error) { + websocketCtx, cancel := context.WithCancel(ctx) + defer cancel() + + return c.RawWebsocket(websocketCtx, prefix, path) +} + // QueryStructRaw sends a request of the specified method to the provided endpoint (optional) on the API matching the endpointType. // The raw response is returned. POST requests can optionally provide raw data to be sent through. // @@ -379,12 +402,12 @@ func (c *Client) RawWebsocket(ctx context.Context, endpointType types.EndpointPr } // URL returns the address used for the client. -func (c *Client) URL() api.URL { - return c.url +func (c *Client) URL() *url.URL { + return &c.url.URL } // UseTarget returns a new client with the query "?target=name" set. -func (c *Client) UseTarget(name string) *Client { +func (c *Client) UseTarget(name string) types.Client { localURL := api.NewURL() localURL.URL.Host = c.url.URL.Host localURL.URL.Scheme = c.url.URL.Scheme From ba7f6d2de8a66f7cecf816d663cdde56ede953e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:41:18 +0100 Subject: [PATCH 06/22] internal/daemon: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/daemon/daemon.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index f490ef38..6cb39c33 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -22,7 +22,6 @@ import ( "github.com/gorilla/mux" "github.com/mattn/go-sqlite3" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/cluster" internalConfig "github.com/canonical/microcluster/v3/internal/config" "github.com/canonical/microcluster/v3/internal/db" @@ -636,7 +635,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st return err } - cluster, err := d.trustStore.Remotes().Cluster(false, d.ServerCert(), publicKey) + clients, err := d.trustStore.Remotes().Cluster(false, d.ServerCert(), publicKey) if err != nil { return err } @@ -654,14 +653,14 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st if len(joinAddresses) > 0 { var lastErr error var clusterConfirmation bool - err = cluster.Query(d.shutdownCtx, true, func(ctx context.Context, c *client.Client) error { + err = clients.Query(d.shutdownCtx, true, func(ctx context.Context, c types.Client) error { // No need to send a request to ourselves. - if d.Address().URL.Host == c.URL().URL.Host { + if d.Address().URL.Host == c.URL().Host { return nil } // Propagate trust to all reachable cluster members for fault tolerance. - err := internalClient.AddTrustStoreEntry(ctx, &c.Client, localMemberInfo) + err := internalClient.AddTrustStoreEntry(ctx, c, localMemberInfo) if err != nil { lastErr = err // Continue trying other nodes even if this one fails @@ -678,7 +677,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st } if !clusterConfirmation { - return fmt.Errorf("Failed to confirm new member %q on any existing system (%d): %w", localMemberInfo.Name, len(cluster)-1, lastErr) + return fmt.Errorf("Failed to confirm new member %q on any existing system (%d): %w", localMemberInfo.Name, len(clients)-1, lastErr) } } @@ -692,11 +691,11 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st var successCount, attemptCount int32 var counterMu sync.Mutex - err = cluster.Query(d.shutdownCtx, true, func(ctx context.Context, c *client.Client) error { + err = clients.Query(d.shutdownCtx, true, func(ctx context.Context, c types.Client) error { c.SetClusterNotification() // No need to send a request to ourselves. - if d.Address().URL.Host == c.URL().URL.Host { + if d.Address().URL.Host == c.URL().Host { return nil } @@ -712,21 +711,21 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st // If this was a join request, instruct all peers to run their OnNewMember hook. if len(joinAddresses) > 0 { - addrPort, err := types.ParseAddrPort(c.URL().URL.Host) + addrPort, err := types.ParseAddrPort(c.URL().Host) if err != nil { return err } remote := remotes.RemoteByAddress(addrPort) if remote == nil { - return fmt.Errorf("No remote found at address %q run the post-remove hook", c.URL().URL.Host) + return fmt.Errorf("No remote found at address %q to run the post-remove hook", c.URL().Host) } // Run the OnNewMember hook, and skip errors on any nodes that are still in the process of joining. - err = internalClient.RunNewMemberHook(ctx, c.Client.UseTarget(remote.Name), types.HookNewMemberOptions{NewMember: localMemberInfo}) + err = internalClient.RunNewMemberHook(ctx, c.UseTarget(remote.Name), types.HookNewMemberOptions{NewMember: localMemberInfo}) if err != nil && !api.StatusErrorCheck(err, http.StatusServiceUnavailable) { // log error but continue with other nodes - d.log().Warn("Failed running OnNewMember hook on node", slog.String("node", c.URL().URL.Host), slog.String("error", err.Error())) + d.log().Warn("Failed running OnNewMember hook on node", slog.String("node", c.URL().Host), slog.String("error", err.Error())) return nil } } @@ -957,11 +956,10 @@ func (d *Daemon) addExtensionServers(preInit bool, fallbackCert *shared.CertInfo return nil } -func (d *Daemon) sendUpgradeNotification(ctx context.Context, c *client.Client) error { - path := c.URL() +func (d *Daemon) sendUpgradeNotification(ctx context.Context, c types.Client) error { parts := strings.Split(string(types.InternalEndpoint), "/") parts = append(parts, "database") - path = *path.Path(parts...) + path := c.URL().JoinPath(parts...) upgradeRequest, err := http.NewRequest("PATCH", path.String(), nil) if err != nil { return err @@ -970,7 +968,7 @@ func (d *Daemon) sendUpgradeNotification(ctx context.Context, c *client.Client) upgradeRequest.Header.Set("X-Dqlite-Version", fmt.Sprintf("%d", 1)) upgradeRequest = upgradeRequest.WithContext(ctx) - resp, err := c.Do(upgradeRequest) + resp, err := c.HTTP().Do(upgradeRequest) if err != nil { d.log().Error("Failed to send database upgrade request", slog.String("error", err.Error())) return nil From 209225c212f87fd4540f11b8863497b58c4643af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:41:45 +0100 Subject: [PATCH 07/22] internal/recover: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/recover/recover.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/recover/recover.go b/internal/recover/recover.go index f97d6d6a..4a9712f9 100644 --- a/internal/recover/recover.go +++ b/internal/recover/recover.go @@ -21,7 +21,6 @@ import ( "github.com/canonical/lxd/shared/api" "gopkg.in/yaml.v3" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/config" "github.com/canonical/microcluster/v3/internal/log" "github.com/canonical/microcluster/v3/internal/trust" @@ -112,13 +111,13 @@ func RecoverFromQuorumLoss(ctx context.Context, filesystem types.OS, members []t return "", err } - cluster, err := remotes.Cluster(false, serverCert, clusterKey) + clients, err := remotes.Cluster(false, serverCert, clusterKey) if err != nil { return "", err } cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) - err = cluster.Query(cancelCtx, true, func(ctx context.Context, client *client.Client) error { + err = clients.Query(cancelCtx, true, func(ctx context.Context, client types.Client) error { var rslt types.Server err := client.Query(ctx, "GET", types.PublicEndpoint, &api.NewURL().URL, nil, &rslt) if err == nil { From 00a9204cd6ecbc5f61a849fe1e118c798a4aa30d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:43:16 +0100 Subject: [PATCH 08/22] internal/rest/client: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/rest/client/cluster.go | 36 +++++++++++++++--------------- internal/rest/client/control.go | 4 ++-- internal/rest/client/daemon.go | 2 +- internal/rest/client/hooks.go | 16 ++++++------- internal/rest/client/ready.go | 4 ++-- internal/rest/client/shutdown.go | 4 ++-- internal/rest/client/sql.go | 8 +++---- internal/rest/client/tokens.go | 12 +++++----- internal/rest/client/truststore.go | 8 +++---- 9 files changed, 47 insertions(+), 47 deletions(-) diff --git a/internal/rest/client/cluster.go b/internal/rest/client/cluster.go index 976c980d..9fcbddb4 100644 --- a/internal/rest/client/cluster.go +++ b/internal/rest/client/cluster.go @@ -20,12 +20,12 @@ func withTimeoutIfUnset(ctx context.Context) (context.Context, context.CancelFun } // AddClusterMember records a new cluster member in the trust store of each current cluster member. -func AddClusterMember(ctx context.Context, c *Client, args types.ClusterMember) (*types.TokenResponse, error) { +func AddClusterMember(ctx context.Context, c types.Client, args types.ClusterMember) (*types.TokenResponse, error) { queryCtx, cancel := withTimeoutIfUnset(ctx) defer cancel() tokenResponse := types.TokenResponse{} - err := c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("cluster").URL, args, &tokenResponse) + err := c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("cluster").URL, args, &tokenResponse) if err != nil { return nil, err } @@ -33,32 +33,32 @@ func AddClusterMember(ctx context.Context, c *Client, args types.ClusterMember) return &tokenResponse, nil } -// ResetClusterMember clears the state directory of the cluster member, and re-execs its daemon. -func ResetClusterMember(ctx context.Context, c *Client, name string, force bool) error { +// GetClusterMembers returns the database record of cluster members. +func GetClusterMembers(ctx context.Context, c types.Client) ([]types.ClusterMember, error) { queryCtx, cancel := withTimeoutIfUnset(ctx) defer cancel() - endpoint := api.NewURL().Path("cluster", name) - if force { - endpoint = endpoint.WithQuery("force", "1") - } + clusterMembers := []types.ClusterMember{} + err := c.Query(queryCtx, "GET", types.PublicEndpoint, &api.NewURL().Path("cluster").URL, nil, &clusterMembers) - return c.QueryStruct(queryCtx, "PUT", types.InternalEndpoint, &endpoint.URL, nil, nil) + return clusterMembers, err } -// GetClusterMembers returns the database record of cluster members. -func (c *Client) GetClusterMembers(ctx context.Context) ([]types.ClusterMember, error) { +// ResetClusterMember clears the state directory of the cluster member, and re-execs its daemon. +func ResetClusterMember(ctx context.Context, c types.Client, name string, force bool) error { queryCtx, cancel := withTimeoutIfUnset(ctx) defer cancel() - clusterMembers := []types.ClusterMember{} - err := c.QueryStruct(queryCtx, "GET", types.PublicEndpoint, &api.NewURL().Path("cluster").URL, nil, &clusterMembers) + endpoint := api.NewURL().Path("cluster", name) + if force { + endpoint = endpoint.WithQuery("force", "1") + } - return clusterMembers, err + return c.Query(queryCtx, "PUT", types.InternalEndpoint, &endpoint.URL, nil, nil) } // DeleteClusterMember deletes the cluster member with the given name. -func (c *Client) DeleteClusterMember(ctx context.Context, name string, force bool) error { +func DeleteClusterMember(ctx context.Context, c types.Client, name string, force bool) error { queryCtx, cancel := withTimeoutIfUnset(ctx) defer cancel() @@ -67,14 +67,14 @@ func (c *Client) DeleteClusterMember(ctx context.Context, name string, force boo endpoint = endpoint.WithQuery("force", "1") } - return c.QueryStruct(queryCtx, "DELETE", types.PublicEndpoint, &endpoint.URL, nil, nil) + return c.Query(queryCtx, "DELETE", types.PublicEndpoint, &endpoint.URL, nil, nil) } // UpdateCertificate sets a new keypair and CA. -func (c *Client) UpdateCertificate(ctx context.Context, name types.CertificateName, args types.KeyPair) error { +func UpdateCertificate(ctx context.Context, c types.Client, name types.CertificateName, args types.KeyPair) error { queryCtx, cancel := withTimeoutIfUnset(ctx) defer cancel() endpoint := api.NewURL().Path("cluster", "certificates", string(name)) - return c.QueryStruct(queryCtx, "PUT", types.PublicEndpoint, &endpoint.URL, args, nil) + return c.Query(queryCtx, "PUT", types.PublicEndpoint, &endpoint.URL, args, nil) } diff --git a/internal/rest/client/control.go b/internal/rest/client/control.go index 5bed31ff..a48577d8 100644 --- a/internal/rest/client/control.go +++ b/internal/rest/client/control.go @@ -7,6 +7,6 @@ import ( ) // ControlDaemon posts control data to the daemon. -func (c *Client) ControlDaemon(ctx context.Context, args types.Control) error { - return c.QueryStruct(ctx, "POST", types.ControlEndpoint, nil, args, nil) +func ControlDaemon(ctx context.Context, c types.Client, args types.Control) error { + return c.Query(ctx, "POST", types.ControlEndpoint, nil, args, nil) } diff --git a/internal/rest/client/daemon.go b/internal/rest/client/daemon.go index 74989462..0806ffa3 100644 --- a/internal/rest/client/daemon.go +++ b/internal/rest/client/daemon.go @@ -15,5 +15,5 @@ func (c *Client) UpdateServers(ctx context.Context, config map[string]types.Serv defer cancel() endpoint := api.NewURL().Path("daemon", "servers") - return c.QueryStruct(queryCtx, "PUT", types.PublicEndpoint, &endpoint.URL, config, nil) + return c.Query(queryCtx, "PUT", types.PublicEndpoint, &endpoint.URL, config, nil) } diff --git a/internal/rest/client/hooks.go b/internal/rest/client/hooks.go index 18a651fb..9dcc91e8 100644 --- a/internal/rest/client/hooks.go +++ b/internal/rest/client/hooks.go @@ -10,33 +10,33 @@ import ( ) // RunPreRemoveHook executes the PreRemove hook with the given configuration on the cluster member targeted by this client. -func RunPreRemoveHook(ctx context.Context, c *Client, config types.HookRemoveMemberOptions) error { +func RunPreRemoveHook(ctx context.Context, c types.Client, config types.HookRemoveMemberOptions) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.PreRemove)).URL, config, nil) + return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.PreRemove)).URL, config, nil) } // RunPostRemoveHook executes the PostRemove hook with the given configuration on the cluster member targeted by this client. -func RunPostRemoveHook(ctx context.Context, c *Client, config types.HookRemoveMemberOptions) error { +func RunPostRemoveHook(ctx context.Context, c types.Client, config types.HookRemoveMemberOptions) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.PostRemove)).URL, config, nil) + return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.PostRemove)).URL, config, nil) } // RunNewMemberHook executes the OnNewMember hook with the given configuration on the cluster member targeted by this client. -func RunNewMemberHook(ctx context.Context, c *Client, config types.HookNewMemberOptions) error { +func RunNewMemberHook(ctx context.Context, c types.Client, config types.HookNewMemberOptions) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.OnNewMember)).URL, config, nil) + return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.OnNewMember)).URL, config, nil) } // RunOnDaemonConfigUpdateHook executes the OnDaemonConfigUpdate hook with the given configuration on the cluster member targeted by this client. -func RunOnDaemonConfigUpdateHook(ctx context.Context, c *Client, config *types.DaemonConfig) error { +func RunOnDaemonConfigUpdateHook(ctx context.Context, c types.Client, config *types.DaemonConfig) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.OnDaemonConfigUpdate)).URL, config, nil) + return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("hooks", string(types.OnDaemonConfigUpdate)).URL, config, nil) } diff --git a/internal/rest/client/ready.go b/internal/rest/client/ready.go index e79d44b2..742ab28f 100644 --- a/internal/rest/client/ready.go +++ b/internal/rest/client/ready.go @@ -10,11 +10,11 @@ import ( ) // CheckReady returns once the daemon has signalled to the ready channel that it is done setting up. -func (c *Client) CheckReady(ctx context.Context) error { +func CheckReady(ctx context.Context, c types.Client) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - err := c.QueryStruct(queryCtx, "GET", types.PublicEndpoint, &api.NewURL().Path("ready").URL, nil, nil) + err := c.Query(queryCtx, "GET", types.PublicEndpoint, &api.NewURL().Path("ready").URL, nil, nil) return err } diff --git a/internal/rest/client/shutdown.go b/internal/rest/client/shutdown.go index 5df9466a..ea386232 100644 --- a/internal/rest/client/shutdown.go +++ b/internal/rest/client/shutdown.go @@ -10,9 +10,9 @@ import ( ) // ShutdownDaemon begins the daemon shutdown sequence. -func (c *Client) ShutdownDaemon(ctx context.Context) error { +func ShutdownDaemon(ctx context.Context, c types.Client) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.ControlEndpoint, &api.NewURL().Path("shutdown").URL, nil, nil) + return c.Query(queryCtx, "POST", types.ControlEndpoint, &api.NewURL().Path("shutdown").URL, nil, nil) } diff --git a/internal/rest/client/sql.go b/internal/rest/client/sql.go index 6398368e..d1a14760 100644 --- a/internal/rest/client/sql.go +++ b/internal/rest/client/sql.go @@ -10,7 +10,7 @@ import ( ) // GetSQL gets a SQL dump of the database. -func GetSQL(ctx context.Context, c *Client, schema bool) (*types.SQLDump, error) { +func GetSQL(ctx context.Context, c types.Client, schema bool) (*types.SQLDump, error) { reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -21,7 +21,7 @@ func GetSQL(ctx context.Context, c *Client, schema bool) (*types.SQLDump, error) endpoint.WithQuery("schema", "1") } - err := c.QueryStruct(reqCtx, "GET", types.InternalEndpoint, &endpoint.URL, nil, dump) + err := c.Query(reqCtx, "GET", types.InternalEndpoint, &endpoint.URL, nil, dump) if err != nil { return nil, err } @@ -30,12 +30,12 @@ func GetSQL(ctx context.Context, c *Client, schema bool) (*types.SQLDump, error) } // PostSQL executes a SQL query against the database. -func PostSQL(ctx context.Context, c *Client, query types.SQLQuery) (*types.SQLBatch, error) { +func PostSQL(ctx context.Context, c types.Client, query types.SQLQuery) (*types.SQLBatch, error) { reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() batch := &types.SQLBatch{} - err := c.QueryStruct(reqCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("sql").URL, query, batch) + err := c.Query(reqCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("sql").URL, query, batch) if err != nil { return nil, err } diff --git a/internal/rest/client/tokens.go b/internal/rest/client/tokens.go index 85e3add3..536f1c91 100644 --- a/internal/rest/client/tokens.go +++ b/internal/rest/client/tokens.go @@ -10,34 +10,34 @@ import ( ) // RequestToken requests a join token with the given name. -func (c *Client) RequestToken(ctx context.Context, name string, expireAfter time.Duration) (string, error) { +func RequestToken(ctx context.Context, c types.Client, name string, expireAfter time.Duration) (string, error) { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() var token string tokenRecord := types.TokenRequest{Name: name, ExpireAfter: expireAfter} - err := c.QueryStruct(queryCtx, "POST", types.ControlEndpoint, &api.NewURL().Path("tokens").URL, tokenRecord, &token) + err := c.Query(queryCtx, "POST", types.ControlEndpoint, &api.NewURL().Path("tokens").URL, tokenRecord, &token) return token, err } // DeleteTokenRecord deletes the toekn record. -func (c *Client) DeleteTokenRecord(ctx context.Context, name string) error { +func DeleteTokenRecord(ctx context.Context, c types.Client, name string) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - err := c.QueryStruct(queryCtx, "DELETE", types.PublicEndpoint, &api.NewURL().Path("tokens", name).URL, nil, nil) + err := c.Query(queryCtx, "DELETE", types.PublicEndpoint, &api.NewURL().Path("tokens", name).URL, nil, nil) return err } // GetTokenRecords returns the token records. -func (c *Client) GetTokenRecords(ctx context.Context) ([]types.TokenRecord, error) { +func GetTokenRecords(ctx context.Context, c types.Client) ([]types.TokenRecord, error) { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() tokenRecords := []types.TokenRecord{} - err := c.QueryStruct(queryCtx, "GET", types.ControlEndpoint, &api.NewURL().Path("tokens").URL, nil, &tokenRecords) + err := c.Query(queryCtx, "GET", types.ControlEndpoint, &api.NewURL().Path("tokens").URL, nil, &tokenRecords) return tokenRecords, err } diff --git a/internal/rest/client/truststore.go b/internal/rest/client/truststore.go index 7a893ca4..acfcb6ac 100644 --- a/internal/rest/client/truststore.go +++ b/internal/rest/client/truststore.go @@ -10,17 +10,17 @@ import ( ) // AddTrustStoreEntry adds a new record to the truststore on all cluster members. -func AddTrustStoreEntry(ctx context.Context, c *Client, args types.ClusterMemberLocal) error { +func AddTrustStoreEntry(ctx context.Context, c types.Client, args types.ClusterMemberLocal) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("truststore").URL, args, nil) + return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("truststore").URL, args, nil) } // DeleteTrustStoreEntry deletes the record corresponding to the given cluster member from the trust store. -func DeleteTrustStoreEntry(ctx context.Context, c *Client, name string) error { +func DeleteTrustStoreEntry(ctx context.Context, c types.Client, name string) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - return c.QueryStruct(queryCtx, "DELETE", types.InternalEndpoint, &api.NewURL().Path("truststore", name).URL, nil, nil) + return c.Query(queryCtx, "DELETE", types.InternalEndpoint, &api.NewURL().Path("truststore", name).URL, nil, nil) } From 86b880b43fe2c65171a1338892acd3ed3b381abd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:45:54 +0100 Subject: [PATCH 09/22] internal/rest/resources: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/rest/resources/certificates.go | 10 +++--- internal/rest/resources/cluster.go | 41 +++++++++++++------------ internal/rest/resources/control.go | 8 ++--- internal/rest/resources/daemon.go | 11 +++---- internal/rest/resources/heartbeat.go | 9 +++--- internal/rest/resources/truststore.go | 23 +++++++------- 6 files changed, 50 insertions(+), 52 deletions(-) diff --git a/internal/rest/resources/certificates.go b/internal/rest/resources/certificates.go index 0afaf3f6..b9e73258 100644 --- a/internal/rest/resources/certificates.go +++ b/internal/rest/resources/certificates.go @@ -14,9 +14,9 @@ import ( "github.com/gorilla/mux" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/log" "github.com/canonical/microcluster/v3/internal/rest/access" + internalClient "github.com/canonical/microcluster/v3/internal/rest/client" internalState "github.com/canonical/microcluster/v3/internal/state" "github.com/canonical/microcluster/v3/microcluster/rest" "github.com/canonical/microcluster/v3/microcluster/rest/response" @@ -56,14 +56,14 @@ func clusterCertificatesPut(s state.State, r *http.Request) response.Response { } // Forward the request to all other nodes if we are the first. - if !client.IsNotification(r) && err == nil { - cluster, err := s.Cluster(true) + if !types.IsNotification(r) && err == nil { + clients, err := s.Connect().Cluster(true) if err != nil { return response.SmartError(err) } - err = cluster.Query(r.Context(), true, func(ctx context.Context, c *client.Client) error { - return c.UpdateCertificate(ctx, types.CertificateName(certificateName), req) + err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error { + return internalClient.UpdateCertificate(ctx, c, types.CertificateName(certificateName), req) }) if err != nil { return response.SmartError(fmt.Errorf("Failed to update %q certificate on peers: %w", certificateName, err)) diff --git a/internal/rest/resources/cluster.go b/internal/rest/resources/cluster.go index dce31bb3..423ad9c6 100644 --- a/internal/rest/resources/cluster.go +++ b/internal/rest/resources/cluster.go @@ -23,7 +23,6 @@ import ( "github.com/gorilla/mux" "golang.org/x/sys/unix" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/cluster" "github.com/canonical/microcluster/v3/internal/log" "github.com/canonical/microcluster/v3/internal/rest/access" @@ -103,12 +102,12 @@ func clusterPost(s state.State, r *http.Request) response.Response { // Forward request to leader. if leaderInfo.Address != s.Address().URL.Host { - client, err := s.Leader() + client, err := s.Connect().Leader(false) if err != nil { return response.SmartError(err) } - tokenResponse, err := internalClient.AddClusterMember(r.Context(), &client.Client, req) + tokenResponse, err := internalClient.AddClusterMember(r.Context(), client, req) if err != nil { return response.SmartError(err) } @@ -302,7 +301,7 @@ func clusterGet(s state.State, r *http.Request) response.Response { return response.SmartError(fmt.Errorf("Failed to create HTTPS client for cluster member with address %q: %w", addr.String(), err)) } - err = d.CheckReady(r.Context()) + err = internalClient.CheckReady(r.Context(), d) if err == nil { apiClusterMembers[i].Status = types.MemberOnline } else { @@ -460,12 +459,12 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { }() } - client, err := s.Leader() + client, err := s.Connect().Leader(false) if err != nil { return response.SmartError(err) } - err = client.DeleteClusterMember(r.Context(), name, force) + err = internalClient.DeleteClusterMember(r.Context(), client, name, force) if err != nil { return response.SmartError(err) } @@ -568,7 +567,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { return response.SmartError(err) } - client, err := s.Leader() + client, err := s.Connect().Leader(false) if err != nil { return response.SmartError(err) } @@ -588,7 +587,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { clusterDisableMu.Unlock() }() - err = client.DeleteClusterMember(r.Context(), name, force) + err = internalClient.DeleteClusterMember(r.Context(), client, name, force) if err != nil { return response.SmartError(err) } @@ -646,7 +645,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { url := api.NewURL() url.URL = *s.FileSystem().ControlSocket() - localClient, err := internalClient.New(*url, nil, nil, false) + localClient, err := s.Connect().Member(&url.URL, false, nil) if err != nil { return response.SmartError(err) } @@ -656,21 +655,18 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { return response.SmartError(err) } - c, err = internalClient.New(remote.URL(), s.ServerCert(), publicKey, false) + remoteURL := remote.URL() + + client, err := s.Connect().Member(&remoteURL.URL, false, publicKey) if err != nil { return response.SmartError(err) } - err = internalClient.ResetClusterMember(r.Context(), c, name, force) + err = internalClient.ResetClusterMember(r.Context(), client, name, force) if err != nil && !force { return response.SmartError(err) } - cluster, err := s.Cluster(false) - if err != nil { - return response.SmartError(err) - } - intState, err := internalState.ToInternal(s) if err != nil { return response.SmartError(err) @@ -684,21 +680,26 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { return response.SmartError(err) } + clients, err := s.Connect().Cluster(false) + if err != nil { + return response.SmartError(err) + } + // Run the PostRemove hook on all other members. remotes := s.Remotes() - err = cluster.Query(r.Context(), true, func(ctx context.Context, c *client.Client) error { + err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error { c.SetClusterNotification() - addrPort, err := types.ParseAddrPort(c.URL().URL.Host) + addrPort, err := types.ParseAddrPort(c.URL().Host) if err != nil { return err } remote := remotes.RemoteByAddress(addrPort) if remote == nil { - return fmt.Errorf("No remote found at address %q run the post-remove hook", c.URL().URL.Host) + return fmt.Errorf("No remote found at address %q to run the post-remove hook", c.URL().Host) } - return internalClient.RunPostRemoveHook(ctx, c.Client.UseTarget(remote.Name), types.HookRemoveMemberOptions{Force: force}) + return internalClient.RunPostRemoveHook(ctx, c.UseTarget(remote.Name), types.HookRemoveMemberOptions{Force: force}) }) if err != nil { return response.SmartError(err) diff --git a/internal/rest/resources/control.go b/internal/rest/resources/control.go index 108b4fe3..262679ba 100644 --- a/internal/rest/resources/control.go +++ b/internal/rest/resources/control.go @@ -122,7 +122,7 @@ func controlPost(state state.State, r *http.Request) response.Response { return } - client, err := internalClient.New(*url, state.ServerCert(), cert, false) + client, err := state.Connect().Member(&url.URL, false, cert) if err != nil { return } @@ -132,7 +132,7 @@ func controlPost(state state.State, r *http.Request) response.Response { <-r.Context().Done() // Use `force=1` to ensure the node is fully removed, in case its listener hasn't been set up. - err = client.DeleteClusterMember(context.Background(), req.Name, true) + err = internalClient.DeleteClusterMember(context.Background(), client, req.Name, true) if err != nil { logger.Error("Failed to clean up cluster state after join failure", slog.String("error", err.Error())) } @@ -247,12 +247,12 @@ func joinWithToken(state state.State, r *http.Request, req *types.Control) (*typ return nil, nil, fmt.Errorf("Cluster certificate token does not match that of cluster member. Expected: %q, actual: %q", fingerprint, token.Fingerprint) } - d, err := internalClient.New(*url, state.ServerCert(), cert, false) + client, err := state.Connect().Member(&url.URL, false, cert) if err != nil { return nil, nil, err } - joinInfo, err = internalClient.AddClusterMember(context.Background(), d, newClusterMember) + joinInfo, err = internalClient.AddClusterMember(context.Background(), client, newClusterMember) if err == nil { break } diff --git a/internal/rest/resources/daemon.go b/internal/rest/resources/daemon.go index 6946cd19..edccb2b9 100644 --- a/internal/rest/resources/daemon.go +++ b/internal/rest/resources/daemon.go @@ -7,7 +7,6 @@ import ( "net/http" "slices" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/rest/access" internalClient "github.com/canonical/microcluster/v3/internal/rest/client" internalState "github.com/canonical/microcluster/v3/internal/state" @@ -89,26 +88,26 @@ func daemonServersPut(s state.State, r *http.Request) response.Response { return response.SmartError(err) } - cluster, err := s.Cluster(false) + clients, err := s.Connect().Cluster(false) if err != nil { return response.SmartError(err) } // Run the OnDaemonConfigUpdate hook on all other members. remotes := s.Remotes() - err = cluster.Query(r.Context(), true, func(ctx context.Context, c *client.Client) error { + err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error { c.SetClusterNotification() - addrPort, err := types.ParseAddrPort(c.URL().URL.Host) + addrPort, err := types.ParseAddrPort(c.URL().Host) if err != nil { return err } remote := remotes.RemoteByAddress(addrPort) if remote == nil { - return fmt.Errorf("No remote found at address %q to run the %q hook", c.URL().URL.Host, types.OnDaemonConfigUpdate) + return fmt.Errorf("No remote found at address %q to run the %q hook", c.URL().Host, types.OnDaemonConfigUpdate) } - return internalClient.RunOnDaemonConfigUpdateHook(ctx, c.Client.UseTarget(remote.Name), daemonConfig.Dump()) + return internalClient.RunOnDaemonConfigUpdateHook(ctx, c.UseTarget(remote.Name), daemonConfig.Dump()) }) if err != nil { return response.SmartError(err) diff --git a/internal/rest/resources/heartbeat.go b/internal/rest/resources/heartbeat.go index 7686904b..0dd3b7a1 100644 --- a/internal/rest/resources/heartbeat.go +++ b/internal/rest/resources/heartbeat.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/cluster" "github.com/canonical/microcluster/v3/internal/log" internalState "github.com/canonical/microcluster/v3/internal/state" @@ -188,7 +187,7 @@ func beginHeartbeat(ctx context.Context, s state.State, hbReq types.HeartbeatInf } } - clusterClients, err := s.Cluster(false) + clusterClients, err := s.Connect().Cluster(false) if err != nil { return response.SmartError(err) } @@ -197,8 +196,8 @@ func beginHeartbeat(ctx context.Context, s state.State, hbReq types.HeartbeatInf mapLock := sync.RWMutex{} // Send heartbeat to non-leader members, updating their local member cache and updating the node. // If we sent a heartbeat to this node within double the request timeout, then we can skip the node this round. - err = clusterClients.Query(ctx, true, func(ctx context.Context, c *client.Client) error { - addr := c.URL().URL.Host + err = clusterClients.Query(ctx, true, func(ctx context.Context, c types.Client) error { + addr := c.URL().Host mapLock.RLock() currentMember, ok := hbInfo.ClusterMembers[addr] @@ -214,7 +213,7 @@ func beginHeartbeat(ctx context.Context, s state.State, hbReq types.HeartbeatInf return nil } - err := intState.InternalDatabase.SendHeartbeat(ctx, &c.Client, hbInfo) + err := intState.InternalDatabase.SendHeartbeat(ctx, c, hbInfo) if err != nil { logger.Error("Received error sending heartbeat to cluster member", slog.String("target", addr), slog.String("error", err.Error())) return nil diff --git a/internal/rest/resources/truststore.go b/internal/rest/resources/truststore.go index bb764e4b..b578f952 100644 --- a/internal/rest/resources/truststore.go +++ b/internal/rest/resources/truststore.go @@ -12,7 +12,6 @@ import ( "github.com/gorilla/mux" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/log" "github.com/canonical/microcluster/v3/internal/rest/access" internalClient "github.com/canonical/microcluster/v3/internal/rest/client" @@ -54,8 +53,8 @@ func trustPost(s state.State, r *http.Request) response.Response { ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) defer cancel() - if !client.IsNotification(r) { - cluster, err := s.Cluster(true) + if !types.IsNotification(r) { + clients, err := s.Connect().Cluster(true) if err != nil { return response.SmartError(err) } @@ -71,9 +70,9 @@ func trustPost(s state.State, r *http.Request) response.Response { // Try to add the truststore entry to all other nodes in the cluster. // We don't fail the entire operation if some nodes are unreachable. - err = cluster.Query(ctx, true, func(ctx context.Context, c *client.Client) error { + err = clients.Query(ctx, true, func(ctx context.Context, c types.Client) error { // No need to send a request to ourselves, or to the node we are adding. - if s.Address().URL.Host == c.URL().URL.Host || req.Address.String() == c.URL().URL.Host { + if s.Address().URL.Host == c.URL().Host || req.Address.String() == c.URL().Host { return nil } @@ -81,10 +80,10 @@ func trustPost(s state.State, r *http.Request) response.Response { attemptCount++ counterMu.Unlock() - err := internalClient.AddTrustStoreEntry(ctx, &c.Client, req) + err := internalClient.AddTrustStoreEntry(ctx, c, req) if err != nil { // log error but continue with other nodes - logger.Warn("Failed adding truststore entry to node", slog.String("node", c.URL().URL.Host), slog.String("error", err.Error())) + logger.Warn("Failed adding truststore entry to node", slog.String("node", c.URL().Host), slog.String("error", err.Error())) return nil } @@ -131,19 +130,19 @@ func trustDelete(s state.State, r *http.Request) response.Response { return response.SmartError(fmt.Errorf("No truststore entry found for node with name %q", name)) } - if !client.IsNotification(r) { - cluster, err := s.Cluster(true) + if !types.IsNotification(r) { + clients, err := s.Connect().Cluster(true) if err != nil { return response.SmartError(err) } - err = cluster.Query(ctx, true, func(ctx context.Context, c *client.Client) error { + err = clients.Query(ctx, true, func(ctx context.Context, c types.Client) error { // No need to send a request to ourselves, or to the node we are adding. - if s.Address().URL.Host == c.URL().URL.Host || nodeToRemove.URL().URL.Host == c.URL().URL.Host { + if s.Address().URL.Host == c.URL().Host || nodeToRemove.URL().URL.Host == c.URL().Host { return nil } - return internalClient.DeleteTrustStoreEntry(ctx, &c.Client, name) + return internalClient.DeleteTrustStoreEntry(ctx, c, name) }) if err != nil { return response.SmartError(err) From 1ff0b38f1201fec0948e1c2fd2bdf862337592e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:46:08 +0100 Subject: [PATCH 10/22] internal/db: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/db/dqlite.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/db/dqlite.go b/internal/db/dqlite.go index c0c420d7..0d9c277b 100644 --- a/internal/db/dqlite.go +++ b/internal/db/dqlite.go @@ -366,13 +366,13 @@ func (db *DqliteDB) GetHeartbeatInterval() time.Duration { } // SendHeartbeat initiates a new heartbeat sequence if this is a leader node. -func (db *DqliteDB) SendHeartbeat(ctx context.Context, c *internalClient.Client, hbInfo types.HeartbeatInfo) error { +func (db *DqliteDB) SendHeartbeat(ctx context.Context, c types.Client, hbInfo types.HeartbeatInfo) error { // set the heartbeat timeout to twice the heartbeat interval. heartbeatTimeout := db.heartbeatInterval * 2 queryCtx, cancel := context.WithTimeout(ctx, heartbeatTimeout) defer cancel() - return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("heartbeat").URL, hbInfo, nil) + return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("heartbeat").URL, hbInfo, nil) } func (db *DqliteDB) heartbeat(leaderInfo dqliteClient.NodeInfo, servers []dqliteClient.NodeInfo) error { From 12cb0d503b4b61ac3dcb957cdc553022409770c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:48:01 +0100 Subject: [PATCH 11/22] internal/state: Add member connection option to state interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This allows easy access for cluster internal connections through the state interface without having to access connection functions from another client package wich should provide a more smooth user experience. Signed-off-by: Julian Pelizäus --- internal/state/state.go | 50 +++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/internal/state/state.go b/internal/state/state.go index 8ad49e77..0dea7142 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -2,13 +2,14 @@ package state import ( "context" + "crypto/x509" "fmt" + "net/url" "time" "github.com/canonical/lxd/shared" "github.com/canonical/lxd/shared/api" - "github.com/canonical/microcluster/v3/client" internalConfig "github.com/canonical/microcluster/v3/internal/config" "github.com/canonical/microcluster/v3/internal/db" "github.com/canonical/microcluster/v3/internal/endpoints" @@ -44,11 +45,8 @@ type State interface { // Local truststore access. Remotes() *trust.Remotes - // Cluster returns a client to every cluster member according to dqlite. - Cluster(isNotification bool) (client.Cluster, error) - - // Leader returns a client to the dqlite cluster leader. - Leader() (*client.Client, error) + // Returns a connector for interconnection with the cluster. + Connect() types.Connector // HasExtension returns whether the given API extension is supported. HasExtension(ext string) bool @@ -160,13 +158,17 @@ func (s *InternalState) HasExtension(ext string) bool { return s.Extensions.HasExtension(ext) } +func (s *InternalState) Connect() types.Connector { + return s +} + // Cluster returns a client for every member of a cluster, except // this one. // All requests made by the client will have the UserAgentNotifier header set // if isNotification is true. // Uses the trust store instead of database for better fault tolerance - // trust store is updated on heartbeats and shouldn't contain crashed nodes. -func (s *InternalState) Cluster(isNotification bool) (client.Cluster, error) { +func (s *InternalState) Cluster(isNotification bool) (types.Clients, error) { publicKey, err := s.ClusterCert().PublicKeyX509() if err != nil { return nil, err @@ -181,9 +183,9 @@ func (s *InternalState) Cluster(isNotification bool) (client.Cluster, error) { } // Filter out ourselves from the client list - clients := make(client.Cluster, 0, len(allClients)-1) + clients := make(types.Clients, 0, len(allClients)-1) for _, client := range allClients { - if s.Address().URL.Host != client.URL().URL.Host { + if s.Address().URL.Host != client.URL().Host { clients = append(clients, client) } } @@ -197,7 +199,7 @@ func (s *InternalState) Cluster(isNotification bool) (client.Cluster, error) { } // Leader returns a client connected to the dqlite leader. -func (s *InternalState) Leader() (*client.Client, error) { +func (s *InternalState) Leader(isNotification bool) (types.Client, error) { ctx, cancel := context.WithTimeout(s.Context, time.Second*30) defer cancel() @@ -217,12 +219,36 @@ func (s *InternalState) Leader() (*client.Client, error) { } url := api.NewURL().Scheme("https").Host(leaderInfo.Address) - c, err := internalClient.New(*url, s.ServerCert(), publicKey, false) + c, err := internalClient.New(*url, s.ServerCert(), publicKey, isNotification) + if err != nil { + return nil, err + } + + return c, nil +} + +// Member returns a client to a specific cluster member based on the given url. +// An additional certificate can be provided to verify the remote endpoint. +func (s *InternalState) Member(url *url.URL, isNotification bool, cert *x509.Certificate) (types.Client, error) { + // If no certificate was provided fallback to the cluster cert. + if cert == nil { + var err error + + cert, err = s.ClusterCert().PublicKeyX509() + if err != nil { + return nil, err + } + } + + apiURL := api.NewURL() + apiURL.URL = *url + + c, err := internalClient.New(*apiURL, s.ServerCert(), cert, isNotification) if err != nil { return nil, err } - return &client.Client{Client: *c}, nil + return c, nil } // ToInternal returns the underlying InternalState from the exposed State interface. From 018aa6d2a5c894e261c4241ccc5326ee083b1622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:48:31 +0100 Subject: [PATCH 12/22] internal/trust: Drop unused SelectRandom func MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/trust/remotes.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/internal/trust/remotes.go b/internal/trust/remotes.go index d6c0ecbc..6deaa2ed 100644 --- a/internal/trust/remotes.go +++ b/internal/trust/remotes.go @@ -4,7 +4,6 @@ import ( "crypto/x509" "errors" "fmt" - "math/rand" "os" "path/filepath" "strings" @@ -15,7 +14,6 @@ import ( "github.com/google/renameio" "gopkg.in/yaml.v3" - "github.com/canonical/microcluster/v3/client" internalClient "github.com/canonical/microcluster/v3/internal/rest/client" "github.com/canonical/microcluster/v3/microcluster/types" ) @@ -237,19 +235,6 @@ func (r *Remotes) Replace(dir string, newRemotes ...types.ClusterMember) error { return nil } -// SelectRandom returns a random remote. -func (r *Remotes) SelectRandom() *Remote { - r.updateMu.RLock() - defer r.updateMu.RUnlock() - - allRemotes := make([]Remote, 0, len(r.data)) - for _, r := range r.data { - allRemotes = append(allRemotes, r) - } - - return &allRemotes[rand.Intn(len(allRemotes))] -} - // Addresses returns just the host:port addresses of the remotes. func (r *Remotes) Addresses() map[string]types.AddrPort { r.updateMu.RLock() From 7139e0d2d4d1419fc5805edb4aa3bbb0156f7623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:48:49 +0100 Subject: [PATCH 13/22] internal/trust: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/trust/remotes.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/trust/remotes.go b/internal/trust/remotes.go index 6deaa2ed..3dd01c07 100644 --- a/internal/trust/remotes.go +++ b/internal/trust/remotes.go @@ -249,8 +249,8 @@ func (r *Remotes) Addresses() map[string]types.AddrPort { } // Cluster returns a set of clients for every remote, which can be concurrently queried. -func (r *Remotes) Cluster(isNotification bool, serverCert *shared.CertInfo, publicKey *x509.Certificate) (client.Cluster, error) { - cluster := make(client.Cluster, 0, r.Count()-1) +func (r *Remotes) Cluster(isNotification bool, serverCert *shared.CertInfo, publicKey *x509.Certificate) (types.Clients, error) { + cluster := make(types.Clients, 0, r.Count()-1) for _, addr := range r.Addresses() { url := api.NewURL().Scheme("https").Host(addr.String()) c, err := internalClient.New(*url, serverCert, publicKey, isNotification) @@ -258,7 +258,7 @@ func (r *Remotes) Cluster(isNotification bool, serverCert *shared.CertInfo, publ return nil, err } - cluster = append(cluster, client.Client{Client: *c}) + cluster = append(cluster, c) } return cluster, nil From b3c5ced376e57ddd8cf8ea042a92aa7dd10eba9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:49:58 +0100 Subject: [PATCH 14/22] microcluster: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- microcluster/app.go | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/microcluster/app.go b/microcluster/app.go index d652ef51..e4eb1dfb 100644 --- a/microcluster/app.go +++ b/microcluster/app.go @@ -16,7 +16,6 @@ import ( "github.com/canonical/lxd/shared/api" "golang.org/x/sys/unix" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/internal/daemon" "github.com/canonical/microcluster/v3/internal/log" "github.com/canonical/microcluster/v3/internal/recover" @@ -44,7 +43,7 @@ type Args struct { // If none is provided a default handler is used. LogHandler slog.Handler - Client *client.Client + Client types.Client Proxy func(*http.Request) (*url.URL, error) } @@ -109,7 +108,7 @@ func (m *MicroCluster) Status(ctx context.Context) (*types.Server, error) { } server := types.Server{} - err = c.QueryStruct(ctx, "GET", types.PublicEndpoint, nil, nil, &server) + err = c.Query(ctx, "GET", types.PublicEndpoint, nil, nil, &server) if err != nil { return nil, fmt.Errorf("Failed to get cluster status: %w", err) } @@ -155,7 +154,7 @@ func (m *MicroCluster) Ready(ctx context.Context) error { logger.Debug(fmt.Sprintf("Checking if MicroCluster daemon is ready (attempt %d)", i)) } - err = c.CheckReady(ctx) + err = internalClient.CheckReady(ctx, c) if err != nil { errLast = err if doLog { @@ -192,7 +191,7 @@ func (m *MicroCluster) NewCluster(ctx context.Context, name string, address stri return fmt.Errorf("Received invalid address %q: %w", address, err) } - return c.ControlDaemon(ctx, types.Control{Bootstrap: true, Address: addr, Name: name, InitConfig: config}) + return internalClient.ControlDaemon(ctx, c, types.Control{Bootstrap: true, Address: addr, Name: name, InitConfig: config}) } // JoinCluster joins an existing cluster with a join token supplied by an existing cluster member. @@ -207,7 +206,7 @@ func (m *MicroCluster) JoinCluster(ctx context.Context, name string, address str return fmt.Errorf("Received invalid address %q: %w", address, err) } - return c.ControlDaemon(ctx, types.Control{JoinToken: token, Address: addr, Name: name, InitConfig: initConfig}) + return internalClient.ControlDaemon(ctx, c, types.Control{JoinToken: token, Address: addr, Name: name, InitConfig: initConfig}) } // GetDqliteClusterMembers retrieves the current local cluster configuration @@ -265,7 +264,7 @@ func (m *MicroCluster) NewJoinToken(ctx context.Context, name string, expireAfte return "", err } - secret, err := c.RequestToken(ctx, name, expireAfter) + secret, err := internalClient.RequestToken(ctx, c, name, expireAfter) if err != nil { return "", err } @@ -280,7 +279,7 @@ func (m *MicroCluster) ListJoinTokens(ctx context.Context) ([]types.TokenRecord, return nil, err } - records, err := c.GetTokenRecords(ctx) + records, err := internalClient.GetTokenRecords(ctx, c) if err != nil { return nil, err } @@ -295,7 +294,7 @@ func (m *MicroCluster) RevokeJoinToken(ctx context.Context, name string) error { return err } - err = c.DeleteTokenRecord(ctx, name) + err = internalClient.DeleteTokenRecord(ctx, c, name) if err != nil { return err } @@ -304,7 +303,7 @@ func (m *MicroCluster) RevokeJoinToken(ctx context.Context, name string) error { } // LocalClient returns a client connected to the local control socket. -func (m *MicroCluster) LocalClient() (*client.Client, error) { +func (m *MicroCluster) LocalClient() (types.Client, error) { c := m.args.Client if c == nil { url := api.NewURL() @@ -315,17 +314,17 @@ func (m *MicroCluster) LocalClient() (*client.Client, error) { return nil, err } - c = &client.Client{Client: *internalClient} + c = internalClient } if m.args.Proxy != nil { - tx, ok := c.Transport.(*http.Transport) + tx, ok := c.HTTP().Transport.(*http.Transport) if !ok { - return nil, fmt.Errorf("Invalid underlying client transport, expected %T, got %T", &http.Transport{}, c.Transport) + return nil, fmt.Errorf("Invalid underlying client transport, expected %T, got %T", &http.Transport{}, c.HTTP().Transport) } tx.Proxy = m.args.Proxy - c.Transport = tx + c.HTTP().Transport = tx } return c, nil @@ -333,7 +332,7 @@ func (m *MicroCluster) LocalClient() (*client.Client, error) { // RemoteClient gets a client for the specified cluster member URL. // The filesystem will be parsed for the cluster and server certificates. -func (m *MicroCluster) RemoteClient(address string) (*client.Client, error) { +func (m *MicroCluster) RemoteClient(address string) (types.Client, error) { var publicKey *x509.Certificate clusterCert, err := m.FileSystem.ClusterCert() if err == nil { @@ -348,7 +347,7 @@ func (m *MicroCluster) RemoteClient(address string) (*client.Client, error) { // RemoteClientWithCert gets a client for the specified cluster member URL using the remote server cert. // The filesystem will be parsed for the server client certificate. -func (m *MicroCluster) RemoteClientWithCert(address string, cert *x509.Certificate) (*client.Client, error) { +func (m *MicroCluster) RemoteClientWithCert(address string, cert *x509.Certificate) (types.Client, error) { c := m.args.Client if c == nil { serverCert, err := m.FileSystem.ServerCert() @@ -362,17 +361,17 @@ func (m *MicroCluster) RemoteClientWithCert(address string, cert *x509.Certifica return nil, err } - c = &client.Client{Client: *internalClient} + c = internalClient } if m.args.Proxy != nil { - tx, ok := c.Transport.(*http.Transport) + tx, ok := c.HTTP().Transport.(*http.Transport) if !ok { - return nil, fmt.Errorf("Invalid underlying client transport, expected %T, got %T", &http.Transport{}, c.Transport) + return nil, fmt.Errorf("Invalid underlying client transport, expected %T, got %T", &http.Transport{}, c.HTTP().Transport) } tx.Proxy = m.args.Proxy - c.Transport = tx + c.HTTP().Transport = tx } return c, nil @@ -396,7 +395,7 @@ func (m *MicroCluster) SQL(ctx context.Context, query string) (string, *types.SQ } if query == ".dump" || query == ".schema" { - dump, err := internalClient.GetSQL(ctx, &c.Client, query == ".schema") + dump, err := internalClient.GetSQL(ctx, c, query == ".schema") if err != nil { return "", nil, fmt.Errorf("failed to parse dump response: %w", err) } @@ -408,7 +407,7 @@ func (m *MicroCluster) SQL(ctx context.Context, query string) (string, *types.SQ Query: query, } - batch, err := internalClient.PostSQL(ctx, &c.Client, data) + batch, err := internalClient.PostSQL(ctx, c, data) return "", batch, err } From 6f0711bdf7f8b0f89e787b67645b02864f0d1c70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:51:48 +0100 Subject: [PATCH 15/22] microcluster: Add new app level funcs which were previously only available through client funcs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We should take the same approach for all operations and not require the user to switch between app level funcs and client funcs inside another package. Signed-off-by: Julian Pelizäus --- microcluster/app.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/microcluster/app.go b/microcluster/app.go index e4eb1dfb..934c8f00 100644 --- a/microcluster/app.go +++ b/microcluster/app.go @@ -100,6 +100,16 @@ func (m *MicroCluster) Start(ctx context.Context, daemonArgs DaemonArgs) error { return nil } +// Shutdown stops the local Microcluster daemon. +func (m *MicroCluster) Shutdown(ctx context.Context) error { + c, err := m.LocalClient() + if err != nil { + return err + } + + return internalClient.ShutdownDaemon(ctx, c) +} + // Status returns basic status information about the cluster. func (m *MicroCluster) Status(ctx context.Context) (*types.Server, error) { c, err := m.LocalClient() @@ -209,6 +219,16 @@ func (m *MicroCluster) JoinCluster(ctx context.Context, name string, address str return internalClient.ControlDaemon(ctx, c, types.Control{JoinToken: token, Address: addr, Name: name, InitConfig: initConfig}) } +// GetClusterMembers returns a list of cluster members. +func (m *MicroCluster) GetClusterMembers(ctx context.Context) ([]types.ClusterMember, error) { + c, err := m.LocalClient() + if err != nil { + return nil, err + } + + return internalClient.GetClusterMembers(ctx, c) +} + // GetDqliteClusterMembers retrieves the current local cluster configuration // (derived from the trust store & dqlite metadata); it does not query the // database. @@ -302,6 +322,16 @@ func (m *MicroCluster) RevokeJoinToken(ctx context.Context, name string) error { return nil } +// RemoveClusterMember removes a member from the cluster. +func (m *MicroCluster) RemoveClusterMember(ctx context.Context, name string, force bool) error { + c, err := m.LocalClient() + if err != nil { + return err + } + + return internalClient.DeleteClusterMember(ctx, c, name, force) +} + // LocalClient returns a client connected to the local control socket. func (m *MicroCluster) LocalClient() (types.Client, error) { c := m.args.Client From 583d5129a45d4379832fae4a558405693b021ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:52:11 +0100 Subject: [PATCH 16/22] example/api: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- example/api/extended.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/example/api/extended.go b/example/api/extended.go index 38760747..a62017e3 100644 --- a/example/api/extended.go +++ b/example/api/extended.go @@ -10,7 +10,6 @@ import ( "github.com/gorilla/websocket" - "github.com/canonical/microcluster/v3/client" extendedTypes "github.com/canonical/microcluster/v3/example/api/types" extendedClient "github.com/canonical/microcluster/v3/example/client" "github.com/canonical/microcluster/v3/microcluster/rest" @@ -43,15 +42,15 @@ var extendedWebsocketCmd = rest.Endpoint{ // This example shows how to forward a request to other cluster members. func cmdSimple(state state.State, r *http.Request) response.Response { // Check the user agent header to check if we are the notifying cluster member. - if !client.IsNotification(r) { + if !types.IsNotification(r) { // Get a collection of clients every other cluster member, with the notification user-agent set. - cluster, err := state.Cluster(true) + clients, err := state.Connect().Cluster(true) if err != nil { return response.SmartError(fmt.Errorf("Failed to get a client for every cluster member: %w", err)) } - messages := make([]string, 0, len(cluster)) - err = cluster.Query(r.Context(), true, func(ctx context.Context, c *client.Client) error { + messages := make([]string, 0, len(clients)) + err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error { addrPort, err := types.ParseAddrPort(state.Address().URL.Host) if err != nil { return fmt.Errorf("Failed to parse addr:port of listen address %q: %w", state.Address().URL.Host, err) From a860c9cd135f38fe09060042c00bf4de532467fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:52:33 +0100 Subject: [PATCH 17/22] example/client: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- example/client/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/example/client/client.go b/example/client/client.go index 460cde74..25e3f087 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -9,14 +9,14 @@ import ( "github.com/gorilla/websocket" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/example/api/types" + microTypes "github.com/canonical/microcluster/v3/microcluster/types" ) // ExtendedSimpleCmd is a client function that sets a context timeout and sends a POST to /1.0/extended/simple using the given // client. This function is expected to be called from an api endpoint handler, which gives us access to the // daemon state, from which we can create a client. -func ExtendedSimpleCmd(ctx context.Context, c *client.Client, data *types.ExtendedType) (string, error) { +func ExtendedSimpleCmd(ctx context.Context, c microTypes.Client, data *types.ExtendedType) (string, error) { queryCtx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() @@ -37,7 +37,7 @@ func ExtendedSimpleCmd(ctx context.Context, c *client.Client, data *types.Extend // ExtendedWebsocketCmd is a client function that sets a context timeout and sends a GET to /1.0/extended/websocket using the given // client. This function is expected to be called from an api endpoint handler, which gives us access to the // daemon state, from which we can create a client. -func ExtendedWebsocketCmd(ctx context.Context, c *client.Client) error { +func ExtendedWebsocketCmd(ctx context.Context, c microTypes.Client) error { queryCtx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() From 5acfcf3fc2468b02fc5097ca019d60db90e7bb71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 14:53:18 +0100 Subject: [PATCH 18/22] example/cmd/microctl: Use new client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- example/cmd/microctl/cluster_members.go | 38 +++---------------------- example/cmd/microctl/shutdown.go | 7 +---- 2 files changed, 5 insertions(+), 40 deletions(-) diff --git a/example/cmd/microctl/cluster_members.go b/example/cmd/microctl/cluster_members.go index cea9f481..7c1fdb65 100644 --- a/example/cmd/microctl/cluster_members.go +++ b/example/cmd/microctl/cluster_members.go @@ -17,7 +17,6 @@ import ( "golang.org/x/sys/unix" "gopkg.in/yaml.v3" - "github.com/canonical/microcluster/v3/client" "github.com/canonical/microcluster/v3/microcluster" "github.com/canonical/microcluster/v3/microcluster/types" ) @@ -78,8 +77,8 @@ type cmdClusterMembersList struct { func (c *cmdClusterMembersList) command() *cobra.Command { cmd := &cobra.Command{ - Use: "list
", - Short: "List cluster members locally, or remotely if an address is specified.", + Use: "list", + Short: "List cluster members locally.", RunE: c.run, } @@ -104,26 +103,7 @@ func (c *cmdClusterMembersList) run(cmd *cobra.Command, args []string) error { return c.listLocalClusterMembers(m) } - var client *client.Client - - // Get a local client connected to the unix socket if no address is specified. - if len(args) == 1 { - client, err = m.RemoteClient(args[0]) - if err != nil { - return err - } - } else { - client, err = m.LocalClient() - if err != nil { - return err - } - } - - return c.listClusterMembers(cmd.Context(), client) -} - -func (c *cmdClusterMembersList) listClusterMembers(ctx context.Context, client *client.Client) error { - clusterMembers, err := client.GetClusterMembers(ctx) + clusterMembers, err := m.GetClusterMembers(context.TODO()) if err != nil { return err } @@ -184,17 +164,7 @@ func (c *cmdClusterMemberRemove) run(cmd *cobra.Command, args []string) error { return err } - client, err := m.LocalClient() - if err != nil { - return err - } - - err = client.DeleteClusterMember(cmd.Context(), args[0], c.flagForce) - if err != nil { - return err - } - - return nil + return m.RemoveClusterMember(cmd.Context(), args[0], c.flagForce) } type cmdClusterEdit struct { diff --git a/example/cmd/microctl/shutdown.go b/example/cmd/microctl/shutdown.go index e4b30b1a..ed8c0a19 100644 --- a/example/cmd/microctl/shutdown.go +++ b/example/cmd/microctl/shutdown.go @@ -33,16 +33,11 @@ func (c *cmdShutdown) run(cmd *cobra.Command, args []string) error { return err } - client, err := m.LocalClient() - if err != nil { - return err - } - chResult := make(chan error, 1) go func() { defer close(chResult) - err := client.ShutdownDaemon(cmd.Context()) + err := m.Shutdown(cmd.Context()) if err != nil { chResult <- err return From fd2f3ce973211b9a1e725b4fed6462ac98a2c801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 15:18:51 +0100 Subject: [PATCH 19/22] microcluster/types/client: Move SelectRandom to RandomMember func on Connector interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/state/state.go | 23 +++++++++++++++++++++++ microcluster/types/client.go | 18 ++---------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/internal/state/state.go b/internal/state/state.go index 0dea7142..76860d47 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -4,6 +4,7 @@ import ( "context" "crypto/x509" "fmt" + "math/rand" "net/url" "time" @@ -251,6 +252,28 @@ func (s *InternalState) Member(url *url.URL, isNotification bool, cert *x509.Cer return c, nil } +// RandomMember returns a client for a random cluster member. +func (s *InternalState) RandomMember(isNotification bool) (types.Client, error) { + clusterClients, err := s.Cluster(isNotification) + if err != nil { + return nil, err + } + + clusterClientNum := len(clusterClients) + + switch clusterClientNum { + case 0: + // Returns an error if the cluster is uninitialized (not bootstrapped, not joined). + return nil, fmt.Errorf("Cluster is uninitialized or has no members") + case 1: + // Returns the only available client if cluster size is 1. + return clusterClients[0], nil + default: + // Returns a randomly selected client for clusters with multiple members. + return clusterClients[rand.Intn(clusterClientNum)], nil + } +} + // ToInternal returns the underlying InternalState from the exposed State interface. func ToInternal(s State) (*InternalState, error) { internal, ok := s.(*InternalState) diff --git a/microcluster/types/client.go b/microcluster/types/client.go index 3f6ca57b..8e73e34e 100644 --- a/microcluster/types/client.go +++ b/microcluster/types/client.go @@ -3,8 +3,6 @@ package types import ( "context" "crypto/x509" - "fmt" - "math/rand" "net/http" "net/url" @@ -40,21 +38,9 @@ type Connector interface { // Member returns a client to the specified member. Member(url *url.URL, isNotification bool, cert *x509.Certificate) (Client, error) -} -// SelectRandom returns a randomly selected client. -func (c Clients) SelectRandom() (*Client, error) { - switch len(c) { - case 0: - // Returns an error if the cluster is uninitialized (not bootstrapped, not joined). - return nil, fmt.Errorf("Cluster is uninitialized or has no members") - case 1: - // Returns the only available client if cluster size is 1. - return &c[0], nil - default: - // Returns a randomly selected client for clusters with multiple members. - return &c[rand.Intn(len(c))], nil - } + // RandomMember returns a client to a random member. + RandomMember(isNotification bool) (Client, error) } // UserAgentNotifier is the user agent used for cluster wide notifications. From 6afc4706cc817e702f3170c515fc66e15edac643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Mon, 12 Jan 2026 16:14:42 +0100 Subject: [PATCH 20/22] internal/rest/client: Fix typo in docstring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/rest/client/tokens.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/rest/client/tokens.go b/internal/rest/client/tokens.go index 536f1c91..e8bb2558 100644 --- a/internal/rest/client/tokens.go +++ b/internal/rest/client/tokens.go @@ -21,7 +21,7 @@ func RequestToken(ctx context.Context, c types.Client, name string, expireAfter return token, err } -// DeleteTokenRecord deletes the toekn record. +// DeleteTokenRecord deletes the token record. func DeleteTokenRecord(ctx context.Context, c types.Client, name string) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() From a08c9db8f91b8cd86cca6af9d1da9fbefa9907bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Thu, 15 Jan 2026 14:21:06 +0100 Subject: [PATCH 21/22] internal/rest/client: Detach UpdateServers func from the client as the others MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julian Pelizäus --- internal/rest/client/daemon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/rest/client/daemon.go b/internal/rest/client/daemon.go index 0806ffa3..3f57e58b 100644 --- a/internal/rest/client/daemon.go +++ b/internal/rest/client/daemon.go @@ -10,7 +10,7 @@ import ( ) // UpdateServers updates the additional servers config. -func (c *Client) UpdateServers(ctx context.Context, config map[string]types.ServerConfig) error { +func UpdateServers(ctx context.Context, c types.Client, config map[string]types.ServerConfig) error { queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() From 48681f73db10435096faeae9b3fe402e700c85f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Peliz=C3=A4us?= Date: Thu, 15 Jan 2026 14:28:04 +0100 Subject: [PATCH 22/22] microcluster: Add missing app level update funcs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As we have detached the client funcs from the client type, there should be an app level func for both server and certificate update too. Signed-off-by: Julian Pelizäus --- microcluster/app.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/microcluster/app.go b/microcluster/app.go index 934c8f00..540d4ed8 100644 --- a/microcluster/app.go +++ b/microcluster/app.go @@ -457,3 +457,23 @@ func (m *MicroCluster) LoggerFromContext(ctx context.Context) *slog.Logger { return logger } + +// UpdateServers updates the extension servers defined when starting the daemon. +func (m *MicroCluster) UpdateServers(ctx context.Context, config map[string]types.ServerConfig) error { + c, err := m.LocalClient() + if err != nil { + return err + } + + return internalClient.UpdateServers(ctx, c, config) +} + +// UpdateCertificates updates the named certificate of either the core or extension server. +func (m *MicroCluster) UpdateCertificates(ctx context.Context, name types.CertificateName, args types.KeyPair) error { + c, err := m.LocalClient() + if err != nil { + return err + } + + return internalClient.UpdateCertificate(ctx, c, name, args) +}