Skip to content
Merged
30 changes: 17 additions & 13 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Daemon struct {

config *internalConfig.DaemonConfig // Local daemon's configuration from daemon.yaml file.

os *sys.OS
os types.OS
serverCert *shared.CertInfo

clusterMu sync.RWMutex
Expand Down Expand Up @@ -186,7 +186,7 @@ func (d *Daemon) Run(ctx context.Context, stateDir string, args Args) error {
d.drainConnectionsTimeout = args.DrainConnectionsTimeout

// Setup the deamon's internal config.
d.config = internalConfig.NewDaemonConfig(filepath.Join(d.os.StateDir, "daemon.yaml"))
d.config = internalConfig.NewDaemonConfig(filepath.Join(d.os.StateDir(), "daemon.yaml"))

// Clean up the daemon state on an error during init.
reverter := revert.New()
Expand Down Expand Up @@ -446,12 +446,12 @@ func (d *Daemon) reload() error {

func (d *Daemon) initStore() error {
var err error
d.fsWatcher, err = sys.NewWatcher(d.shutdownCtx, d.os.StateDir)
d.fsWatcher, err = sys.NewWatcher(d.shutdownCtx, d.os.StateDir())
if err != nil {
return err
}

d.trustStore, err = trust.Init(d.fsWatcher, nil, d.os.TrustDir)
d.trustStore, err = trust.Init(d.fsWatcher, nil, d.os.TrustDir())
if err != nil {
return err
}
Expand Down Expand Up @@ -541,7 +541,7 @@ func (d *Daemon) StartAPI(ctx context.Context, bootstrap bool, initConfig map[st
}

if bootstrap {
err = d.trustStore.Remotes().Add(d.os.TrustDir, localNode)
err = d.trustStore.Remotes().Add(d.os.TrustDir(), localNode)
if err != nil {
return fmt.Errorf("Failed to initialize local remote entry: %w", err)
}
Expand Down Expand Up @@ -838,7 +838,11 @@ func (d *Daemon) UpdateServers() error {
// startUnixServer starts up the core unix listener with the given resources.
func (d *Daemon) startUnixServer(serverEndpoints []rest.Resources, socketGroup string) error {
ctlServer := d.initServer(serverEndpoints...)
ctl := endpoints.NewSocket(d.shutdownCtx, ctlServer, d.os.ControlSocket(), socketGroup, d.drainConnectionsTimeout)

url := api.NewURL()
url.URL = *d.os.ControlSocket()

ctl := endpoints.NewSocket(d.shutdownCtx, ctlServer, *url, socketGroup, d.drainConnectionsTimeout)
d.endpoints = endpoints.NewEndpoints(d.shutdownCtx, map[string]endpoints.Endpoint{
endpoints.EndpointsUnix: ctl,
})
Expand Down Expand Up @@ -920,7 +924,7 @@ func (d *Daemon) addExtensionServers(preInit bool, fallbackCert *shared.CertInfo
continue
}

customCertExists := shared.PathExists(filepath.Join(d.os.CertificatesDir, fmt.Sprintf("%s.crt", serverName)))
customCertExists := shared.PathExists(filepath.Join(d.os.CertificatesDir(), fmt.Sprintf("%s.crt", serverName)))

var err error
var cert *shared.CertInfo
Expand All @@ -930,7 +934,7 @@ func (d *Daemon) addExtensionServers(preInit bool, fallbackCert *shared.CertInfo
} else {
// Generate a dedicated certificate or load the custom one if it exists.
// When updating the additional listeners the dedicated certificate from before will be reused.
cert, err = shared.KeyPairAndCA(d.os.CertificatesDir, serverName, shared.CertServer, shared.CertOptions{AddHosts: true, CommonName: serverName})
cert, err = shared.KeyPairAndCA(d.os.CertificatesDir(), serverName, shared.CertServer, shared.CertOptions{AddHosts: true, CommonName: serverName})
if err != nil {
return fmt.Errorf("Failed to setup dedicated certificate for additional server %q: %w", serverName, err)
}
Expand Down Expand Up @@ -1000,9 +1004,9 @@ func (d *Daemon) ReloadCert(name types.CertificateName) error {

var dir string
if name == types.ClusterCertificateName || name == types.ServerCertificateName {
dir = d.os.StateDir
dir = d.os.StateDir()
} else {
dir = d.os.CertificatesDir
dir = d.os.CertificatesDir()
}

cert, err := shared.KeyPairAndCA(dir, string(name), shared.CertServer, shared.CertOptions{AddHosts: true, CommonName: d.Name()})
Expand Down Expand Up @@ -1035,7 +1039,7 @@ func (d *Daemon) ReloadCert(name types.CertificateName) error {
// - and cannot load a custom certificate which shares their name
d.extensionServersMu.RLock()
for name, server := range d.extensionServers {
certExists := shared.PathExists(filepath.Join(d.os.CertificatesDir, fmt.Sprintf("%s.crt", name)))
certExists := shared.PathExists(filepath.Join(d.os.CertificatesDir(), fmt.Sprintf("%s.crt", name)))
if !server.CoreAPI && !server.DedicatedCertificate && !certExists {
d.endpoints.UpdateTLSByName(name, cert)
}
Expand Down Expand Up @@ -1099,8 +1103,8 @@ func (d *Daemon) ExtensionServers() []string {
}

// FileSystem returns the filesystem structure for the daemon.
func (d *Daemon) FileSystem() *sys.OS {
copyOS := *d.os
func (d *Daemon) FileSystem() types.OS {
copyOS := *(d.os.(*sys.OS))
return &copyOS
}

Expand Down
2 changes: 1 addition & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (db *DqliteDB) waitUpgrade(bootstrap bool, ext extensions.Extensions) error

otherNodesBehind := false
newSchema := db.Schema()
newSchema.File(path.Join(db.os.StateDir, "patch.global.sql"))
newSchema.File(path.Join(db.os.StateDir(), "patch.global.sql"))

if !bootstrap {
checkVersions := func(ctx context.Context, current int, tx *sql.Tx) error {
Expand Down
15 changes: 9 additions & 6 deletions internal/db/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type DqliteDB struct {
listenAddr api.URL // Listen address for this dqlite node.

dbName string // This is db.bin.
os *sys.OS
os types.OS

db *sql.DB
dqlite *dqlite.App
Expand Down Expand Up @@ -73,7 +73,7 @@ func (db *DqliteDB) Accept(conn net.Conn) {
}

// NewDB creates an empty db struct with no dqlite connection.
func NewDB(ctx context.Context, serverCert func() *shared.CertInfo, clusterCert func() *shared.CertInfo, memberName func() string, os *sys.OS, heartbeatInterval time.Duration) (*DqliteDB, error) {
func NewDB(ctx context.Context, serverCert func() *shared.CertInfo, clusterCert func() *shared.CertInfo, memberName func() string, os types.OS, heartbeatInterval time.Duration) (*DqliteDB, error) {
shutdownCtx, shutdownCancel := context.WithCancel(ctx)

if heartbeatInterval == 0 {
Expand Down Expand Up @@ -133,7 +133,7 @@ func (db *DqliteDB) SchemaVersion() (versionInternal uint64, versionExternal uin
// isInitialized determines whether the database has been bootstrapped or joined to a cluster.
// This is an internal helper function; external callers should use Status() instead.
func (db *DqliteDB) isInitialized() (bool, error) {
_, err := os.Stat(filepath.Join(db.os.DatabaseDir, "info.yaml"))
_, err := os.Stat(filepath.Join(db.os.DatabaseDir(), "info.yaml"))
if err != nil {
if os.IsNotExist(err) {
return false, nil
Expand All @@ -149,7 +149,7 @@ func (db *DqliteDB) isInitialized() (bool, error) {
func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr api.URL, clusterRecord cluster.CoreClusterMember) error {
var err error
db.listenAddr = addr
db.dqlite, err = dqlite.New(db.os.DatabaseDir,
db.dqlite, err = dqlite.New(db.os.DatabaseDir(),
dqlite.WithAddress(db.listenAddr.URL.Host),
dqlite.WithRolesAdjustmentFrequency(db.heartbeatInterval),
dqlite.WithRolesAdjustmentHook(db.heartbeat),
Expand Down Expand Up @@ -183,7 +183,7 @@ func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr api.URL, cl
func (db *DqliteDB) Join(extensions extensions.Extensions, addr api.URL, joinAddresses ...string) error {
var err error
db.listenAddr = addr
db.dqlite, err = dqlite.New(db.os.DatabaseDir,
db.dqlite, err = dqlite.New(db.os.DatabaseDir(),
dqlite.WithCluster(joinAddresses),
dqlite.WithRolesAdjustmentFrequency(db.heartbeatInterval),
dqlite.WithRolesAdjustmentHook(db.heartbeat),
Expand Down Expand Up @@ -357,7 +357,10 @@ func (db *DqliteDB) heartbeat(leaderInfo dqliteClient.NodeInfo, servers []dqlite
return nil
}

client, err := internalClient.New(db.os.ControlSocket(), nil, nil, false)
url := api.NewURL()
url.URL = *db.os.ControlSocket()

client, err := internalClient.New(*url, nil, nil, false)
if err != nil {
db.log().Error("Failed to get local client", slog.String("address", db.listenAddr.String()), slog.String("error", err.Error()))
return nil
Expand Down
61 changes: 30 additions & 31 deletions internal/recover/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,22 @@ import (
"github.com/canonical/microcluster/v3/client"
"github.com/canonical/microcluster/v3/internal/config"
"github.com/canonical/microcluster/v3/internal/log"
"github.com/canonical/microcluster/v3/internal/sys"
"github.com/canonical/microcluster/v3/internal/trust"
"github.com/canonical/microcluster/v3/microcluster/types"
)

// GetDqliteClusterMembers parses the trust store and
// path.Join(filesystem.DatabaseDir, "cluster.yaml").
func GetDqliteClusterMembers(filesystem *sys.OS) ([]types.DqliteMember, error) {
storePath := path.Join(filesystem.DatabaseDir, "cluster.yaml")
func GetDqliteClusterMembers(filesystem types.OS) ([]types.DqliteMember, error) {
storePath := path.Join(filesystem.DatabaseDir(), "cluster.yaml")

var nodeInfo []dqlite.NodeInfo
err := readYaml(storePath, &nodeInfo)
if err != nil {
return nil, err
}

remotes, err := readTrustStore(filesystem.TrustDir)
remotes, err := readTrustStore(filesystem.TrustDir())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -68,7 +67,7 @@ func GetDqliteClusterMembers(filesystem *sys.OS) ([]types.DqliteMember, error) {
// files, modifies the daemon and trust store, and writes a recovery tarball.
// It does not check members to ensure that the new configuration is valid; use
// ValidateMemberChanges to ensure that the inputs to this function are correct.
func RecoverFromQuorumLoss(ctx context.Context, filesystem *sys.OS, members []types.DqliteMember) (string, error) {
func RecoverFromQuorumLoss(ctx context.Context, filesystem types.OS, members []types.DqliteMember) (string, error) {
// Set up our new cluster configuration
nodeInfo := make([]dqlite.NodeInfo, 0, len(members))
for _, member := range members {
Expand All @@ -93,7 +92,7 @@ func RecoverFromQuorumLoss(ctx context.Context, filesystem *sys.OS, members []ty
// Check each cluster member's /1.0 to ensure that they are unreachable.
// This is a sanity check to ensure that we're not reconfiguring a cluster
// that's still partially up.
remotes, err := readTrustStore(filesystem.TrustDir)
remotes, err := readTrustStore(filesystem.TrustDir())
if err != nil {
return "", err
}
Expand Down Expand Up @@ -138,13 +137,13 @@ func RecoverFromQuorumLoss(ctx context.Context, filesystem *sys.OS, members []ty
return "", err
}

err = dqlite.ReconfigureMembershipExt(filesystem.DatabaseDir, nodeInfo)
err = dqlite.ReconfigureMembershipExt(filesystem.DatabaseDir(), nodeInfo)
if err != nil {
return "", fmt.Errorf("Dqlite recovery: %w", err)
}

// Update local info.yaml with our new address
localInfoYamlPath := path.Join(filesystem.DatabaseDir, "info.yaml")
localInfoYamlPath := path.Join(filesystem.DatabaseDir(), "info.yaml")

var localInfo dqlite.NodeInfo
err = readYaml(localInfoYamlPath, &localInfo)
Expand All @@ -164,7 +163,7 @@ func RecoverFromQuorumLoss(ctx context.Context, filesystem *sys.OS, members []ty
return "", err
}

err = writeDqliteClusterYaml(path.Join(filesystem.DatabaseDir, "cluster.yaml"), members)
err = writeDqliteClusterYaml(path.Join(filesystem.DatabaseDir(), "cluster.yaml"), members)
if err != nil {
return "", err
}
Expand All @@ -180,7 +179,7 @@ func RecoverFromQuorumLoss(ctx context.Context, filesystem *sys.OS, members []ty
return recoveryTarballPath, err
}

err = updateTrustStore(filesystem.TrustDir, members)
err = updateTrustStore(filesystem.TrustDir(), members)
if err != nil {
return recoveryTarballPath, fmt.Errorf("Failed to update trust store: %w", err)
}
Expand Down Expand Up @@ -235,13 +234,13 @@ func writeDqliteClusterYaml(path string, members []types.DqliteMember) error {
return writeYaml(path, &nodeInfo)
}

func updateDaemonAddress(filesystem *sys.OS, address string) error {
func updateDaemonAddress(filesystem types.OS, address string) error {
newAddress, err := types.ParseAddrPort(address)
if err != nil {
return fmt.Errorf("Failed to update daemon.yaml: %w", err)
}

daemonConfig := config.NewDaemonConfig(path.Join(filesystem.StateDir, "daemon.yaml"))
daemonConfig := config.NewDaemonConfig(path.Join(filesystem.StateDir(), "daemon.yaml"))
err = daemonConfig.Load()
if err != nil {
return fmt.Errorf("Failed to load daemon.yaml: %w", err)
Expand Down Expand Up @@ -350,14 +349,14 @@ func ValidateMemberChanges(oldMembers []types.DqliteMember, newMembers []types.D
return nil
}

func writeGlobalMembersPatch(filesystem *sys.OS, members []types.DqliteMember) error {
func writeGlobalMembersPatch(filesystem types.OS, members []types.DqliteMember) error {
sql := ""
for _, member := range members {
sql += fmt.Sprintf("UPDATE core_cluster_members SET address = %q WHERE name = %q;\n", member.Address, member.Name)
}

if len(sql) > 0 {
patchPath := path.Join(filesystem.StateDir, "patch.global.sql")
patchPath := path.Join(filesystem.StateDir(), "patch.global.sql")
patchFile, err := os.OpenFile(patchPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
Expand All @@ -384,9 +383,9 @@ func writeGlobalMembersPatch(filesystem *sys.OS, members []types.DqliteMember) e
// go-dqlite's info.yaml is excluded from the tarball.
// The new cluster configuration is included as `recovery.yaml`.
// This function returns the path to the tarball.
func createRecoveryTarball(ctx context.Context, filesystem *sys.OS, members []types.DqliteMember) (string, error) {
tarballPath := path.Join(filesystem.StateDir, "recovery_db.tar.gz")
recoveryYamlPath := path.Join(filesystem.DatabaseDir, "recovery.yaml")
func createRecoveryTarball(ctx context.Context, filesystem types.OS, members []types.DqliteMember) (string, error) {
tarballPath := path.Join(filesystem.StateDir(), "recovery_db.tar.gz")
recoveryYamlPath := path.Join(filesystem.DatabaseDir(), "recovery.yaml")

err := writeYaml(recoveryYamlPath, members)
if err != nil {
Expand All @@ -396,7 +395,7 @@ func createRecoveryTarball(ctx context.Context, filesystem *sys.OS, members []ty
// info.yaml is used by go-dqlite to keep track of the current cluster member's
// ID and address. We shouldn't replicate the recovery member's info.yaml
// to all other members, so exclude it from the tarball:
err = createTarball(ctx, tarballPath, filesystem.DatabaseDir, ".", []string{"info.yaml"})
err = createTarball(ctx, tarballPath, filesystem.DatabaseDir(), ".", []string{"info.yaml"})

return tarballPath, err
}
Expand All @@ -405,9 +404,9 @@ func createRecoveryTarball(ctx context.Context, filesystem *sys.OS, members []ty
// fiesystem.StateDir. If it exists, unpack it into a temporary directory,
// ensure that it is a valid microcluster recovery tarball, and replace the
// existing filesystem.DatabaseDir.
func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem *sys.OS) error {
tarballPath := path.Join(filesystem.StateDir, "recovery_db.tar.gz")
unpackDir := path.Join(filesystem.StateDir, "recovery_db")
func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem types.OS) error {
tarballPath := path.Join(filesystem.StateDir(), "recovery_db.tar.gz")
unpackDir := path.Join(filesystem.StateDir(), "recovery_db")
recoveryYamlPath := path.Join(unpackDir, "recovery.yaml")

// Determine if the recovery tarball exists
Expand All @@ -430,7 +429,7 @@ func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem *sys.OS) error {

// We need to set the local info.yaml address with the (possibly changed)
// incoming address for this member.
localInfoYamlPath := path.Join(filesystem.DatabaseDir, "info.yaml")
localInfoYamlPath := path.Join(filesystem.DatabaseDir(), "info.yaml")
recoveryInfoYamlPath := path.Join(unpackDir, "info.yaml")

var localInfo dqlite.NodeInfo
Expand Down Expand Up @@ -465,7 +464,7 @@ func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem *sys.OS) error {
}

// Update the local trust store with the incoming cluster configuration
err = updateTrustStore(filesystem.TrustDir, incomingMembers)
err = updateTrustStore(filesystem.TrustDir(), incomingMembers)
if err != nil {
return err
}
Expand All @@ -477,7 +476,7 @@ func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem *sys.OS) error {

// Now that we're as sure as we can be that the recovery DB is valid, we can
// replace the existing DB
err = os.RemoveAll(filesystem.DatabaseDir)
err = os.RemoveAll(filesystem.DatabaseDir())
if err != nil {
return err
}
Expand All @@ -487,7 +486,7 @@ func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem *sys.OS) error {
return err
}

err = os.Rename(unpackDir, filesystem.DatabaseDir)
err = os.Rename(unpackDir, filesystem.DatabaseDir())
if err != nil {
return err
}
Expand All @@ -510,13 +509,13 @@ func MaybeUnpackRecoveryTarball(ctx context.Context, filesystem *sys.OS) error {
// CreateDatabaseBackup writes a tarball of filesystem.DatabaseDir to
// filesystem.StateDir as db_backup.TIMESTAMP.tar.gz. It does not check to
// to ensure that the database is stopped.
func CreateDatabaseBackup(ctx context.Context, filesystem *sys.OS) error {
func CreateDatabaseBackup(ctx context.Context, filesystem types.OS) error {
// tar interprets `:` as a remote drive; ISO8601 allows a 'basic format'
// with the colons omitted (as opposed to time.RFC3339)
// https://en.wikipedia.org/wiki/ISO_8601
backupFileName := fmt.Sprintf("db_backup.%s.tar.gz", time.Now().Format("2006-01-02T150405Z0700"))

backupFilePath := path.Join(filesystem.StateDir, backupFileName)
backupFilePath := path.Join(filesystem.StateDir(), backupFileName)

logger, err := log.LoggerFromContext(ctx)
if err != nil {
Expand All @@ -527,13 +526,13 @@ func CreateDatabaseBackup(ctx context.Context, filesystem *sys.OS) error {

// For DB backups the tarball should contain the subdirs (usually `database/`)
// so that the user can easily untar the backup from the state dir.
rootDir := filesystem.StateDir
walkDir, err := filepath.Rel(filesystem.StateDir, filesystem.DatabaseDir)
rootDir := filesystem.StateDir()
walkDir, err := filepath.Rel(filesystem.StateDir(), filesystem.DatabaseDir())

// Don't bother if DatabaseDir is not inside StateDir
if err != nil {
logger.Warn("DB backup: DatabaseDir (%q) not in StateDir (%q)", slog.String("databaseDir", filesystem.DatabaseDir), slog.String("stateDir", filesystem.StateDir))
rootDir = filesystem.DatabaseDir
logger.Warn("DB backup: DatabaseDir (%q) not in StateDir (%q)", slog.String("databaseDir", filesystem.DatabaseDir()), slog.String("stateDir", filesystem.StateDir()))
rootDir = filesystem.DatabaseDir()
walkDir = "."
}

Expand Down
Loading