Skip to content
Merged
8 changes: 4 additions & 4 deletions example/api/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func cmdSimple(state state.State, r *http.Request) response.Response {

messages := make([]string, 0, len(clients))
err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error {
addrPort, err := types.ParseAddrPort(state.Address().URL.Host)
addrPort, err := types.ParseAddrPort(state.Address().Host)
if err != nil {
return fmt.Errorf("Failed to parse addr:port of listen address %q: %w", state.Address().URL.Host, err)
return fmt.Errorf("Failed to parse addr:port of listen address %q: %w", state.Address().Host, err)
}

// Our payload in this case is defined by us as ExtendedType.
Expand Down Expand Up @@ -94,7 +94,7 @@ func cmdSimple(state state.State, r *http.Request) response.Response {
}

// Return some identifying information.
message := fmt.Sprintf("cluster member at address %q received message %q from cluster member at address %q", state.Address().URL.Host, info.Message, info.Sender.String())
message := fmt.Sprintf("cluster member at address %q received message %q from cluster member at address %q", state.Address().Host, info.Message, info.Sender.String())

return response.SyncResponse(true, message)
}
Expand All @@ -117,7 +117,7 @@ func cmdWebsocket(state state.State, r *http.Request) response.Response {
defer conn.Close()

for i := range 3 {
text := fmt.Sprintf("Testing from %q, iteration %d ...", state.Address().URL.Host, i+1)
text := fmt.Sprintf("Testing from %q, iteration %d ...", state.Address().Host, i+1)
err := conn.WriteMessage(websocket.TextMessage, []byte(text))
if err != nil {
return err
Expand Down
38 changes: 18 additions & 20 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"slices"
Expand Down Expand Up @@ -334,7 +335,7 @@ func (d *Daemon) init(listenAddress string, socketGroup string, heartbeatInterva

if listenAddress != "" {
serverEndpoints = []rest.Resources{resources.PublicEndpoints}
err = d.addCoreServers(true, *listenAddr, d.ServerCert(), serverEndpoints)
err = d.addCoreServers(true, &listenAddr.URL, d.ServerCert(), serverEndpoints)
if err != nil {
return err
}
Expand Down Expand Up @@ -520,7 +521,7 @@ func (d *Daemon) setConfig(newConfig trust.Location) error {
// StartAPI starts up the admin and consumer APIs, and generates a cluster cert
// if we are bootstrapping the first node.
func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[string]string, joinAddresses ...string) error {
if d.Address().URL.Host == "" || d.config.GetName() == "" {
if d.Address().Host == "" || d.config.GetName() == "" {
return fmt.Errorf("Cannot start network API without valid daemon configuration")
}

Expand All @@ -529,7 +530,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
return fmt.Errorf("Failed to parse server certificate when bootstrapping API: %w", err)
}

addrPort, err := types.ParseAddrPort(d.Address().URL.Host)
addrPort, err := types.ParseAddrPort(d.Address().Host)
if err != nil {
return fmt.Errorf("Failed to parse listen address when bootstrapping API: %w", err)
}
Expand All @@ -553,7 +554,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st

// Validate the extension servers again now that we have applied addresses.
d.extensionServersMu.RLock()
err = resources.ValidateEndpoints(d.extensionServers, d.Address().URL.Host)
err = resources.ValidateEndpoints(d.extensionServers, d.Address().Host)
if err != nil {
return err
}
Expand All @@ -568,13 +569,13 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
}

serverEndpoints := []rest.Resources{resources.InternalEndpoints, resources.PublicEndpoints}
err = d.addCoreServers(false, *d.Address(), d.ClusterCert(), serverEndpoints)
err = d.addCoreServers(false, d.Address(), d.ClusterCert(), serverEndpoints)
if err != nil {
return err
}

// Add extension servers before post-join hook.
err = d.addExtensionServers(false, d.ClusterCert(), d.Address().URL.Host)
err = d.addExtensionServers(false, d.ClusterCert(), d.Address().Host)
if err != nil {
return err
}
Expand All @@ -591,7 +592,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st

clusterMember.SchemaInternal, clusterMember.SchemaExternal, _ = d.db.Schema().Version()

err = d.db.Bootstrap(d.Extensions, *d.Address(), clusterMember)
err = d.db.Bootstrap(d.Extensions, d.Address(), clusterMember)
if err != nil {
return err
}
Expand All @@ -613,12 +614,12 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
}

if len(joinAddresses) != 0 {
err = d.db.Join(d.Extensions, *d.Address(), joinAddresses...)
err = d.db.Join(d.Extensions, d.Address(), joinAddresses...)
if err != nil {
return fmt.Errorf("Failed to join cluster: %w", err)
}
} else {
err = d.db.StartWithCluster(d.Extensions, *d.Address(), d.trustStore.Remotes().Addresses())
err = d.db.StartWithCluster(d.Extensions, d.Address(), d.trustStore.Remotes().Addresses())
if err != nil {
return fmt.Errorf("Failed to re-establish cluster connection: %w", err)
}
Expand Down Expand Up @@ -655,7 +656,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
var clusterConfirmation bool
err = clients.Query(d.shutdownCtx, true, func(ctx context.Context, c types.Client) error {
// No need to send a request to ourselves.
if d.Address().URL.Host == c.URL().Host {
if d.Address().Host == c.URL().Host {
return nil
}

Expand Down Expand Up @@ -695,7 +696,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
c.SetClusterNotification()

// No need to send a request to ourselves.
if d.Address().URL.Host == c.URL().Host {
if d.Address().Host == c.URL().Host {
return nil
}

Expand Down Expand Up @@ -826,7 +827,7 @@ func (d *Daemon) UpdateServers() error {

// Start any additional listener.
// This operation is idempotent.
err := d.addExtensionServers(false, d.ClusterCert(), d.Address().URL.Host)
err := d.addExtensionServers(false, d.ClusterCert(), d.Address().Host)
if err != nil {
return err
}
Expand All @@ -838,10 +839,7 @@ func (d *Daemon) UpdateServers() error {
func (d *Daemon) startUnixServer(serverEndpoints []rest.Resources, socketGroup string) error {
ctlServer := d.initServer(serverEndpoints...)

url := api.NewURL()
url.URL = *d.os.ControlSocket()

ctl := endpoints.NewSocket(d.shutdownCtx, ctlServer, *url, socketGroup, d.drainConnectionsTimeout)
ctl := endpoints.NewSocket(d.shutdownCtx, ctlServer, d.os.ControlSocket(), socketGroup, d.drainConnectionsTimeout)
d.endpoints = endpoints.NewEndpoints(d.shutdownCtx, map[string]endpoints.Endpoint{
endpoints.EndpointsUnix: ctl,
})
Expand All @@ -851,7 +849,7 @@ func (d *Daemon) startUnixServer(serverEndpoints []rest.Resources, socketGroup s

// addCoreServers initializes the default resources with the default address and certificate.
// If the default address and certificate may be applied to any extension servers, those will be started as well.
func (d *Daemon) addCoreServers(preInit bool, defaultURL api.URL, defaultCert *shared.CertInfo, defaultResources []rest.Resources) error {
func (d *Daemon) addCoreServers(preInit bool, defaultURL *url.URL, defaultCert *shared.CertInfo, defaultResources []rest.Resources) error {
serverEndpoints := []rest.Resources{}
serverEndpoints = append(serverEndpoints, defaultResources...)

Expand Down Expand Up @@ -940,7 +938,7 @@ func (d *Daemon) addExtensionServers(preInit bool, fallbackCert *shared.CertInfo
}

server := d.initServer(extensionServer.Resources...)
network := endpoints.NewNetwork(d.shutdownCtx, endpoints.EndpointNetwork, server, *url, cert, extensionServer.DrainConnectionsTimeout)
network := endpoints.NewNetwork(d.shutdownCtx, endpoints.EndpointNetwork, server, &url.URL, cert, extensionServer.DrainConnectionsTimeout)
networks[serverName] = network
}

Expand Down Expand Up @@ -1060,8 +1058,8 @@ func (d *Daemon) ServerCert() *shared.CertInfo {
}

// Address is the listen address for the daemon.
func (d *Daemon) Address() *api.URL {
return api.NewURL().Scheme("https").Host(d.config.GetAddress().String())
func (d *Daemon) Address() *url.URL {
return &api.NewURL().Scheme("https").Host(d.config.GetAddress().String()).URL
}

// Name is this daemon's cluster member name.
Expand Down
8 changes: 4 additions & 4 deletions internal/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ func (t *daemonsSuite) Test_UpdateServers() {

// Check if servers are up.
for _, addr := range test.listeningOn {
url := api.NewURL().Scheme("https").Host(addr.String())
url := &api.NewURL().Scheme("https").Host(addr.String()).URL

// The remote server uses the cluster certificate.
remoteCert, err := daemon.ClusterCert().PublicKeyX509()
require.NoError(t.T(), err)

// We also use the cluster certificate as a client certificate for this test.
client, err := client.New(*url, daemon.ClusterCert(), remoteCert, false)
client, err := client.New(url, daemon.ClusterCert(), remoteCert, false)
require.NoError(t.T(), err)

// Use embedded Get from Go's HTTP client.
Expand All @@ -247,12 +247,12 @@ func (t *daemonsSuite) Test_UpdateServers() {

// Check if servers are down.
for _, addr := range test.notListeningOn {
url := api.NewURL().Scheme("https").Host(addr.String())
url := &api.NewURL().Scheme("https").Host(addr.String()).URL

remoteCert, err := daemon.ClusterCert().PublicKeyX509()
require.NoError(t.T(), err)

client, err := client.New(*url, daemon.ClusterCert(), remoteCert, false)
client, err := client.New(url, daemon.ClusterCert(), remoteCert, false)
require.NoError(t.T(), err)

_, err = client.Get(url.String())
Expand Down
2 changes: 1 addition & 1 deletion internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func NewTestDB(extensionsExternal []clusterDB.Update) (*DqliteDB, error) {
db := &DqliteDB{
ctx: ctx,
memberName: func() string { return fmt.Sprintf("cluster-member-%d", 0) },
listenAddr: *api.NewURL().Host("10.0.0.0:8443"),
listenAddr: &api.NewURL().Host("10.0.0.0:8443").URL,
upgradeCh: make(chan struct{}, 1),
os: &sys.OS{},
}
Expand Down
21 changes: 9 additions & 12 deletions internal/db/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type DqliteDB struct {
memberName func() string // Local cluster member name
clusterCert func() *shared.CertInfo // Cluster certificate for dqlite authentication.
serverCert func() *shared.CertInfo // Server certificate for dqlite authentication.
listenAddr api.URL // Listen address for this dqlite node.
listenAddr *url.URL // Listen address for this dqlite node.

dbName string // This is db.bin.
os types.OS
Expand Down Expand Up @@ -146,11 +146,11 @@ func (db *DqliteDB) isInitialized() (bool, error) {
}

// Bootstrap dqlite.
func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr api.URL, clusterRecord cluster.CoreClusterMember) error {
func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr *url.URL, clusterRecord cluster.CoreClusterMember) error {
var err error
db.listenAddr = addr
db.dqlite, err = dqlite.New(db.os.DatabaseDir(),
dqlite.WithAddress(db.listenAddr.URL.Host),
dqlite.WithAddress(db.listenAddr.Host),
dqlite.WithRolesAdjustmentFrequency(db.heartbeatInterval),
dqlite.WithRolesAdjustmentHook(db.heartbeat),
dqlite.WithConcurrentLeaderConns(&db.maxConns),
Expand Down Expand Up @@ -196,14 +196,14 @@ func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr api.URL, cl
}

// Join a dqlite cluster with the address of a member.
func (db *DqliteDB) Join(extensions extensions.Extensions, addr api.URL, joinAddresses ...string) error {
func (db *DqliteDB) Join(extensions extensions.Extensions, addr *url.URL, joinAddresses ...string) error {
var err error
db.listenAddr = addr
db.dqlite, err = dqlite.New(db.os.DatabaseDir(),
dqlite.WithCluster(joinAddresses),
dqlite.WithRolesAdjustmentFrequency(db.heartbeatInterval),
dqlite.WithRolesAdjustmentHook(db.heartbeat),
dqlite.WithAddress(db.listenAddr.URL.Host),
dqlite.WithAddress(db.listenAddr.Host),
dqlite.WithConcurrentLeaderConns(&db.maxConns),
dqlite.WithExternalConn(db.dialFunc(), db.acceptCh),
dqlite.WithUnixSocket(os.Getenv(sys.DqliteSocket)))
Expand Down Expand Up @@ -248,7 +248,7 @@ func (db *DqliteDB) Join(extensions extensions.Extensions, addr api.URL, joinAdd
}

// StartWithCluster starts up dqlite and joins the cluster.
func (db *DqliteDB) StartWithCluster(extensions extensions.Extensions, addr api.URL, clusterMembers map[string]types.AddrPort) error {
func (db *DqliteDB) StartWithCluster(extensions extensions.Extensions, addr *url.URL, clusterMembers map[string]types.AddrPort) error {
allClusterAddrs := []string{}
for _, clusterMemberAddrs := range clusterMembers {
allClusterAddrs = append(allClusterAddrs, clusterMemberAddrs.String())
Expand Down Expand Up @@ -319,7 +319,7 @@ func (db *DqliteDB) IsOpen(ctx context.Context) error {
}

for _, member := range allMembers {
if member.Address == db.listenAddr.URL.Host {
if member.Address == db.listenAddr.Host {
continue
}

Expand Down Expand Up @@ -385,15 +385,12 @@ func (db *DqliteDB) heartbeat(leaderInfo dqliteClient.NodeInfo, servers []dqlite
return nil
}

if leaderInfo.Address != db.listenAddr.URL.Host {
if leaderInfo.Address != db.listenAddr.Host {
db.log().Debug("Not performing heartbeat, this system is not the dqlite leader", slog.String("address", db.listenAddr.String()))
return nil
}

url := api.NewURL()
url.URL = *db.os.ControlSocket()

client, err := internalClient.New(*url, nil, nil, false)
client, err := internalClient.New(db.os.ControlSocket(), nil, nil, false)
if err != nil {
db.log().Error("Failed to get local client", slog.String("address", db.listenAddr.String()), slog.String("error", err.Error()))
return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/endpoints/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ import (
"log/slog"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"

"github.com/canonical/microcluster/v3/internal/log"
)

// Network represents an HTTPS listener and its server.
type Network struct {
address api.URL
address *url.URL
certMu sync.RWMutex
cert *shared.CertInfo
networkType EndpointType
Expand All @@ -34,7 +34,7 @@ type Network struct {
}

// NewNetwork assigns an address, certificate, and server to the Network.
func NewNetwork(ctx context.Context, endpointType EndpointType, server *http.Server, address api.URL, cert *shared.CertInfo, drainConnTimeout time.Duration) *Network {
func NewNetwork(ctx context.Context, endpointType EndpointType, server *http.Server, address *url.URL, cert *shared.CertInfo, drainConnTimeout time.Duration) *Network {
ctx, cancel := context.WithCancel(ctx)

return &Network{
Expand Down Expand Up @@ -63,7 +63,7 @@ func (n *Network) Type() EndpointType {

// Listen on the given address.
func (n *Network) Listen() error {
listenAddress := canonicalNetworkAddress(n.address.URL.Host, shared.HTTPSDefaultPort)
listenAddress := canonicalNetworkAddress(n.address.Host, shared.HTTPSDefaultPort)
protocol := "tcp"

if strings.HasPrefix(listenAddress, "0.0.0.0") {
Expand Down
4 changes: 2 additions & 2 deletions internal/endpoints/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"log/slog"
"net"
"net/http"
"net/url"
"os"
"os/user"
"strconv"
"time"

"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"

"github.com/canonical/microcluster/v3/internal/log"
)
Expand All @@ -33,7 +33,7 @@ type Socket struct {
}

// NewSocket returns a Socket struct with no listener attached yet.
func NewSocket(ctx context.Context, server *http.Server, path api.URL, group string, drainConnTimeout time.Duration) *Socket {
func NewSocket(ctx context.Context, server *http.Server, path *url.URL, group string, drainConnTimeout time.Duration) *Socket {
ctx, cancel := context.WithCancel(ctx)
return &Socket{
Path: path.Hostname(),
Expand Down
Loading
Loading