Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2f8eacb
microcluster/types: Add client Client and Connector interface
roosterfish Jan 12, 2026
b9f9ec4
microcluster/types: Move SelectRandom and Query to the respective pac…
roosterfish Jan 12, 2026
78904b1
microcluster/types: Move IsNotification to the client
roosterfish Jan 12, 2026
3423396
microcluster/types/addport: Drop SelectRandom as it's unused
roosterfish Jan 12, 2026
c8e28e9
internal/rest/client: Move public facing API funcs to internal and im…
roosterfish Jan 12, 2026
ba7f6d2
internal/daemon: Use new client interface
roosterfish Jan 12, 2026
209225c
internal/recover: Use new client interface
roosterfish Jan 12, 2026
00a9204
internal/rest/client: Use new client interface
roosterfish Jan 12, 2026
86b880b
internal/rest/resources: Use new client interface
roosterfish Jan 12, 2026
1ff0b38
internal/db: Use new client interface
roosterfish Jan 12, 2026
12cb0d5
internal/state: Add member connection option to state interface
roosterfish Jan 12, 2026
018aa6d
internal/trust: Drop unused SelectRandom func
roosterfish Jan 12, 2026
7139e0d
internal/trust: Use new client interface
roosterfish Jan 12, 2026
b3c5ced
microcluster: Use new client interface
roosterfish Jan 12, 2026
6f0711b
microcluster: Add new app level funcs which were previously only avai…
roosterfish Jan 12, 2026
583d512
example/api: Use new client interface
roosterfish Jan 12, 2026
a860c9c
example/client: Use new client interface
roosterfish Jan 12, 2026
5acfcf3
example/cmd/microctl: Use new client interface
roosterfish Jan 12, 2026
fd2f3ce
microcluster/types/client: Move SelectRandom to RandomMember func on …
roosterfish Jan 12, 2026
6afc470
internal/rest/client: Fix typo in docstring
roosterfish Jan 12, 2026
a08c9db
internal/rest/client: Detach UpdateServers func from the client as th…
roosterfish Jan 15, 2026
48681f7
microcluster: Add missing app level update funcs
roosterfish Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions client/client.go

This file was deleted.

67 changes: 0 additions & 67 deletions client/cluster.go

This file was deleted.

9 changes: 4 additions & 5 deletions example/api/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/gorilla/websocket"

"github.com/canonical/microcluster/v3/client"
extendedTypes "github.com/canonical/microcluster/v3/example/api/types"
extendedClient "github.com/canonical/microcluster/v3/example/client"
"github.com/canonical/microcluster/v3/microcluster/rest"
Expand Down Expand Up @@ -43,15 +42,15 @@ var extendedWebsocketCmd = rest.Endpoint{
// This example shows how to forward a request to other cluster members.
func cmdSimple(state state.State, r *http.Request) response.Response {
// Check the user agent header to check if we are the notifying cluster member.
if !client.IsNotification(r) {
if !types.IsNotification(r) {
// Get a collection of clients every other cluster member, with the notification user-agent set.
cluster, err := state.Cluster(true)
clients, err := state.Connect().Cluster(true)
if err != nil {
return response.SmartError(fmt.Errorf("Failed to get a client for every cluster member: %w", err))
}

messages := make([]string, 0, len(cluster))
err = cluster.Query(r.Context(), true, func(ctx context.Context, c *client.Client) error {
messages := make([]string, 0, len(clients))
err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error {
addrPort, err := types.ParseAddrPort(state.Address().URL.Host)
if err != nil {
return fmt.Errorf("Failed to parse addr:port of listen address %q: %w", state.Address().URL.Host, err)
Expand Down
6 changes: 3 additions & 3 deletions example/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (

"github.com/gorilla/websocket"

"github.com/canonical/microcluster/v3/client"
"github.com/canonical/microcluster/v3/example/api/types"
microTypes "github.com/canonical/microcluster/v3/microcluster/types"
)

// ExtendedSimpleCmd is a client function that sets a context timeout and sends a POST to /1.0/extended/simple using the given
// client. This function is expected to be called from an api endpoint handler, which gives us access to the
// daemon state, from which we can create a client.
func ExtendedSimpleCmd(ctx context.Context, c *client.Client, data *types.ExtendedType) (string, error) {
func ExtendedSimpleCmd(ctx context.Context, c microTypes.Client, data *types.ExtendedType) (string, error) {
queryCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

Expand All @@ -37,7 +37,7 @@ func ExtendedSimpleCmd(ctx context.Context, c *client.Client, data *types.Extend
// ExtendedWebsocketCmd is a client function that sets a context timeout and sends a GET to /1.0/extended/websocket using the given
// client. This function is expected to be called from an api endpoint handler, which gives us access to the
// daemon state, from which we can create a client.
func ExtendedWebsocketCmd(ctx context.Context, c *client.Client) error {
func ExtendedWebsocketCmd(ctx context.Context, c microTypes.Client) error {
queryCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

Expand Down
38 changes: 4 additions & 34 deletions example/cmd/microctl/cluster_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"golang.org/x/sys/unix"
"gopkg.in/yaml.v3"

"github.com/canonical/microcluster/v3/client"
"github.com/canonical/microcluster/v3/microcluster"
"github.com/canonical/microcluster/v3/microcluster/types"
)
Expand Down Expand Up @@ -78,8 +77,8 @@ type cmdClusterMembersList struct {

func (c *cmdClusterMembersList) command() *cobra.Command {
cmd := &cobra.Command{
Use: "list <address>",
Short: "List cluster members locally, or remotely if an address is specified.",
Use: "list",
Short: "List cluster members locally.",
RunE: c.run,
}

Expand All @@ -104,26 +103,7 @@ func (c *cmdClusterMembersList) run(cmd *cobra.Command, args []string) error {
return c.listLocalClusterMembers(m)
}

var client *client.Client

// Get a local client connected to the unix socket if no address is specified.
if len(args) == 1 {
client, err = m.RemoteClient(args[0])
if err != nil {
return err
}
} else {
client, err = m.LocalClient()
if err != nil {
return err
}
}

return c.listClusterMembers(cmd.Context(), client)
}

func (c *cmdClusterMembersList) listClusterMembers(ctx context.Context, client *client.Client) error {
clusterMembers, err := client.GetClusterMembers(ctx)
clusterMembers, err := m.GetClusterMembers(context.TODO())
if err != nil {
return err
}
Expand Down Expand Up @@ -184,17 +164,7 @@ func (c *cmdClusterMemberRemove) run(cmd *cobra.Command, args []string) error {
return err
}

client, err := m.LocalClient()
if err != nil {
return err
}

err = client.DeleteClusterMember(cmd.Context(), args[0], c.flagForce)
if err != nil {
return err
}

return nil
return m.RemoveClusterMember(cmd.Context(), args[0], c.flagForce)
}

type cmdClusterEdit struct {
Expand Down
7 changes: 1 addition & 6 deletions example/cmd/microctl/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,11 @@ func (c *cmdShutdown) run(cmd *cobra.Command, args []string) error {
return err
}

client, err := m.LocalClient()
if err != nil {
return err
}

chResult := make(chan error, 1)
go func() {
defer close(chResult)

err := client.ShutdownDaemon(cmd.Context())
err := m.Shutdown(cmd.Context())
if err != nil {
chResult <- err
return
Expand Down
30 changes: 14 additions & 16 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/gorilla/mux"
"github.com/mattn/go-sqlite3"

"github.com/canonical/microcluster/v3/client"
"github.com/canonical/microcluster/v3/internal/cluster"
internalConfig "github.com/canonical/microcluster/v3/internal/config"
"github.com/canonical/microcluster/v3/internal/db"
Expand Down Expand Up @@ -636,7 +635,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
return err
}

cluster, err := d.trustStore.Remotes().Cluster(false, d.ServerCert(), publicKey)
clients, err := d.trustStore.Remotes().Cluster(false, d.ServerCert(), publicKey)
if err != nil {
return err
}
Expand All @@ -654,14 +653,14 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
if len(joinAddresses) > 0 {
var lastErr error
var clusterConfirmation bool
err = cluster.Query(d.shutdownCtx, true, func(ctx context.Context, c *client.Client) error {
err = clients.Query(d.shutdownCtx, true, func(ctx context.Context, c types.Client) error {
// No need to send a request to ourselves.
if d.Address().URL.Host == c.URL().URL.Host {
if d.Address().URL.Host == c.URL().Host {
return nil
}

// Propagate trust to all reachable cluster members for fault tolerance.
err := internalClient.AddTrustStoreEntry(ctx, &c.Client, localMemberInfo)
err := internalClient.AddTrustStoreEntry(ctx, c, localMemberInfo)
if err != nil {
lastErr = err
// Continue trying other nodes even if this one fails
Expand All @@ -678,7 +677,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
}

if !clusterConfirmation {
return fmt.Errorf("Failed to confirm new member %q on any existing system (%d): %w", localMemberInfo.Name, len(cluster)-1, lastErr)
return fmt.Errorf("Failed to confirm new member %q on any existing system (%d): %w", localMemberInfo.Name, len(clients)-1, lastErr)
}
}

Expand All @@ -692,11 +691,11 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
var successCount, attemptCount int32
var counterMu sync.Mutex

err = cluster.Query(d.shutdownCtx, true, func(ctx context.Context, c *client.Client) error {
err = clients.Query(d.shutdownCtx, true, func(ctx context.Context, c types.Client) error {
c.SetClusterNotification()

// No need to send a request to ourselves.
if d.Address().URL.Host == c.URL().URL.Host {
if d.Address().URL.Host == c.URL().Host {
return nil
}

Expand All @@ -712,21 +711,21 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st

// If this was a join request, instruct all peers to run their OnNewMember hook.
if len(joinAddresses) > 0 {
addrPort, err := types.ParseAddrPort(c.URL().URL.Host)
addrPort, err := types.ParseAddrPort(c.URL().Host)
if err != nil {
return err
}

remote := remotes.RemoteByAddress(addrPort)
if remote == nil {
return fmt.Errorf("No remote found at address %q run the post-remove hook", c.URL().URL.Host)
return fmt.Errorf("No remote found at address %q to run the post-remove hook", c.URL().Host)
}

// Run the OnNewMember hook, and skip errors on any nodes that are still in the process of joining.
err = internalClient.RunNewMemberHook(ctx, c.Client.UseTarget(remote.Name), types.HookNewMemberOptions{NewMember: localMemberInfo})
err = internalClient.RunNewMemberHook(ctx, c.UseTarget(remote.Name), types.HookNewMemberOptions{NewMember: localMemberInfo})
if err != nil && !api.StatusErrorCheck(err, http.StatusServiceUnavailable) {
// log error but continue with other nodes
d.log().Warn("Failed running OnNewMember hook on node", slog.String("node", c.URL().URL.Host), slog.String("error", err.Error()))
d.log().Warn("Failed running OnNewMember hook on node", slog.String("node", c.URL().Host), slog.String("error", err.Error()))
return nil
}
}
Expand Down Expand Up @@ -957,11 +956,10 @@ func (d *Daemon) addExtensionServers(preInit bool, fallbackCert *shared.CertInfo
return nil
}

func (d *Daemon) sendUpgradeNotification(ctx context.Context, c *client.Client) error {
path := c.URL()
func (d *Daemon) sendUpgradeNotification(ctx context.Context, c types.Client) error {
parts := strings.Split(string(types.InternalEndpoint), "/")
parts = append(parts, "database")
path = *path.Path(parts...)
path := c.URL().JoinPath(parts...)
upgradeRequest, err := http.NewRequest("PATCH", path.String(), nil)
if err != nil {
return err
Expand All @@ -970,7 +968,7 @@ func (d *Daemon) sendUpgradeNotification(ctx context.Context, c *client.Client)
upgradeRequest.Header.Set("X-Dqlite-Version", fmt.Sprintf("%d", 1))
upgradeRequest = upgradeRequest.WithContext(ctx)

resp, err := c.Do(upgradeRequest)
resp, err := c.HTTP().Do(upgradeRequest)
if err != nil {
d.log().Error("Failed to send database upgrade request", slog.String("error", err.Error()))
return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/db/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,13 @@ func (db *DqliteDB) GetHeartbeatInterval() time.Duration {
}

// SendHeartbeat initiates a new heartbeat sequence if this is a leader node.
func (db *DqliteDB) SendHeartbeat(ctx context.Context, c *internalClient.Client, hbInfo types.HeartbeatInfo) error {
func (db *DqliteDB) SendHeartbeat(ctx context.Context, c types.Client, hbInfo types.HeartbeatInfo) error {
// set the heartbeat timeout to twice the heartbeat interval.
heartbeatTimeout := db.heartbeatInterval * 2
queryCtx, cancel := context.WithTimeout(ctx, heartbeatTimeout)
defer cancel()

return c.QueryStruct(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("heartbeat").URL, hbInfo, nil)
return c.Query(queryCtx, "POST", types.InternalEndpoint, &api.NewURL().Path("heartbeat").URL, hbInfo, nil)
}

func (db *DqliteDB) heartbeat(leaderInfo dqliteClient.NodeInfo, servers []dqliteClient.NodeInfo) error {
Expand Down
Loading