Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/cluster/cluster_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/canonical/microcluster/v3/internal/db/update"
"github.com/canonical/microcluster/v3/internal/extensions"
"github.com/canonical/microcluster/v3/microcluster/types"
)

Expand All @@ -25,7 +24,7 @@ type CoreClusterMember struct {
Certificate string
SchemaInternal uint64
SchemaExternal uint64
APIExtensions extensions.Extensions
APIExtensions types.Extensions
Heartbeat time.Time
Role Role
}
Expand Down Expand Up @@ -67,7 +66,7 @@ func (c CoreClusterMember) ToAPI() (*types.ClusterMember, error) {
// GetUpgradingClusterMembers returns the list of all cluster members during an upgrade, as well as a map of members who we consider to be in a waiting state.
// This function can be used immediately after dqlite is ready, before we have loaded any prepared statements.
// A cluster member will be in a waiting state if a different cluster member still exists with a smaller API extension count or schema version.
func GetUpgradingClusterMembers(ctx context.Context, tx *sql.Tx, schemaInternal uint64, schemaExternal uint64, apiExtensions extensions.Extensions) (allMembers []CoreClusterMember, awaitingMembers map[string]bool, err error) {
func GetUpgradingClusterMembers(ctx context.Context, tx *sql.Tx, schemaInternal uint64, schemaExternal uint64, apiExtensions types.Extensions) (allMembers []CoreClusterMember, awaitingMembers map[string]bool, err error) {
tableName, err := update.PrepareUpdateV1(ctx, tx)
if err != nil {
return nil, nil, err
Expand Down
5 changes: 2 additions & 3 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
internalConfig "github.com/canonical/microcluster/v3/internal/config"
"github.com/canonical/microcluster/v3/internal/db"
"github.com/canonical/microcluster/v3/internal/endpoints"
"github.com/canonical/microcluster/v3/internal/extensions"
internalLog "github.com/canonical/microcluster/v3/internal/log"
"github.com/canonical/microcluster/v3/internal/recover"
internalREST "github.com/canonical/microcluster/v3/internal/rest"
Expand Down Expand Up @@ -105,7 +104,7 @@ type Daemon struct {
shutdownDoneCh chan error // Receives the result of state.Stop() when exit() is called and tells the daemon to end.
shutdownCancel context.CancelFunc // Cancels the shutdownCtx to indicate shutdown starting.

Extensions extensions.Extensions // Extensions supported at runtime by the daemon.
Extensions types.Extensions // Extensions supported at runtime by the daemon.

// stop is a sync.Once which wraps the daemon's stop sequence. Each call will block until the first one completes.
stop func() error
Expand Down Expand Up @@ -272,7 +271,7 @@ func (d *Daemon) init(listenAddress string, socketGroup string, heartbeatInterva
d.config.SetName(name)

// Initialize the extensions registry with the internal extensions.
d.Extensions, err = extensions.NewExtensionRegistry(true)
d.Extensions, err = types.NewExtensionRegistry(true)
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ import (

"github.com/canonical/microcluster/v3/internal/db/query"
"github.com/canonical/microcluster/v3/internal/db/update"
"github.com/canonical/microcluster/v3/internal/extensions"
"github.com/canonical/microcluster/v3/internal/sys"
clusterDB "github.com/canonical/microcluster/v3/microcluster/db"
"github.com/canonical/microcluster/v3/microcluster/types"
)

// Open opens the dqlite database and loads the schema.
// Returns true if we need to wait for other nodes to catch up to our version.
func (db *DqliteDB) Open(ext extensions.Extensions, bootstrap bool) error {
func (db *DqliteDB) Open(ext types.Extensions, bootstrap bool) error {
// Allow dqlite up to 2 minutes to become ready when starting up.
// This is to allow for unready/dead nodes to time out.
ctx, cancel := context.WithTimeout(db.ctx, 120*time.Second)
Expand Down Expand Up @@ -88,7 +87,7 @@ func (db *DqliteDB) Open(ext extensions.Extensions, bootstrap bool) error {
// waitUpgrade compares the version information of all cluster members in the database to the local version.
// If this node's version is ahead of others, then it will block on the `db.upgradeCh` or up to a minute.
// If this node's version is behind others, then it returns an error.
func (db *DqliteDB) waitUpgrade(bootstrap bool, ext extensions.Extensions) error {
func (db *DqliteDB) waitUpgrade(bootstrap bool, ext types.Extensions) error {
checkSchemaVersion := func(schemaVersion uint64, clusterMemberVersions []uint64) (otherNodesBehind bool, err error) {
nodeIsBehind := false
for _, version := range clusterMemberVersions {
Expand Down Expand Up @@ -116,7 +115,7 @@ func (db *DqliteDB) waitUpgrade(bootstrap bool, ext extensions.Extensions) error
return nodeIsBehind, nil
}

checkAPIExtensions := func(currentAPIExtensions extensions.Extensions, clusterMemberAPIExtensions []extensions.Extensions) (otherNodesBehind bool, err error) {
checkAPIExtensions := func(currentAPIExtensions types.Extensions, clusterMemberAPIExtensions []types.Extensions) (otherNodesBehind bool, err error) {
db.log().Debug(fmt.Sprintf("Local API extensions: %v, cluster members API extensions: %v", currentAPIExtensions, clusterMemberAPIExtensions))

nodeIsBehind := false
Expand Down
64 changes: 32 additions & 32 deletions internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

"github.com/canonical/microcluster/v3/internal/cluster"
"github.com/canonical/microcluster/v3/internal/db/update"
"github.com/canonical/microcluster/v3/internal/extensions"
"github.com/canonical/microcluster/v3/internal/log"
"github.com/canonical/microcluster/v3/internal/sys"
clusterDB "github.com/canonical/microcluster/v3/microcluster/db"
"github.com/canonical/microcluster/v3/microcluster/types"
)

type dbSuite struct {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *dbSuite) Test_waitUpgradeSchema() {
tx, err := db.db.BeginTx(db.ctx, nil)
s.NoError(err)

apiExtensions, err := extensions.NewExtensionRegistry(true)
apiExtensions, err := types.NewExtensionRegistry(true)
s.NoError(err)

// Generate a cluster member for the local node.
Expand Down Expand Up @@ -289,47 +289,47 @@ func (s *dbSuite) Test_waitUpgradeSchema() {
func (s *dbSuite) Test_waitUpgradeAPI() {
tests := []struct {
name string
upgradedLocalAPIExtensions extensions.Extensions
clusterMembersExtensions []extensions.Extensions
upgradedLocalAPIExtensions types.Extensions
clusterMembersExtensions []types.Extensions
expectErr error
expectWait bool
}{
{
name: "No upgrade, no other nodes",
upgradedLocalAPIExtensions: extensions.Extensions{},
upgradedLocalAPIExtensions: types.Extensions{},
},
{
name: "API upgrade, no other nodes",
upgradedLocalAPIExtensions: extensions.Extensions{"internal:a", "ext"},
upgradedLocalAPIExtensions: types.Extensions{"internal:a", "ext"},
},
{
name: "All other nodes ahead",
upgradedLocalAPIExtensions: extensions.Extensions{"internal:a", "ext"},
clusterMembersExtensions: []extensions.Extensions{{"internal:a", "ext", "ext2"}, {"internal:a", "ext", "ext2"}},
upgradedLocalAPIExtensions: types.Extensions{"internal:a", "ext"},
clusterMembersExtensions: []types.Extensions{{"internal:a", "ext", "ext2"}, {"internal:a", "ext", "ext2"}},
expectErr: fmt.Errorf("This node's API extensions are behind, please upgrade"),
},
{
name: "Some other nodes ahead",
upgradedLocalAPIExtensions: extensions.Extensions{"internal:a", "ext"},
clusterMembersExtensions: []extensions.Extensions{{"internal:a", "ext"}, {"internal:a", "ext", "ext2"}},
upgradedLocalAPIExtensions: types.Extensions{"internal:a", "ext"},
clusterMembersExtensions: []types.Extensions{{"internal:a", "ext"}, {"internal:a", "ext", "ext2"}},
expectErr: fmt.Errorf("This node's API extensions are behind, please upgrade"),
},
{
name: "All other nodes behind",
upgradedLocalAPIExtensions: extensions.Extensions{"internal:a", "ext", "ext2"},
clusterMembersExtensions: []extensions.Extensions{{"internal:a", "ext"}, {"internal:a", "ext"}},
upgradedLocalAPIExtensions: types.Extensions{"internal:a", "ext", "ext2"},
clusterMembersExtensions: []types.Extensions{{"internal:a", "ext"}, {"internal:a", "ext"}},
expectWait: true,
},
{
name: "Some other nodes behind",
upgradedLocalAPIExtensions: extensions.Extensions{"internal:a", "ext", "ext2"},
clusterMembersExtensions: []extensions.Extensions{{"internal:a", "ext", "ext2"}, {"internal:a", "ext"}},
upgradedLocalAPIExtensions: types.Extensions{"internal:a", "ext", "ext2"},
clusterMembersExtensions: []types.Extensions{{"internal:a", "ext", "ext2"}, {"internal:a", "ext"}},
expectWait: true,
},
{
name: "Some nodes behind, others ahead",
upgradedLocalAPIExtensions: extensions.Extensions{"internal:a", "ext", "ext2"},
clusterMembersExtensions: []extensions.Extensions{{"internal:a", "ext"}, {"internal:a", "ext", "ext2", "ext3"}},
upgradedLocalAPIExtensions: types.Extensions{"internal:a", "ext", "ext2"},
clusterMembersExtensions: []types.Extensions{{"internal:a", "ext"}, {"internal:a", "ext", "ext2", "ext3"}},
expectErr: fmt.Errorf("This node's API extensions are behind, please upgrade"),
},
}
Expand Down Expand Up @@ -417,9 +417,9 @@ func (s *dbSuite) Test_waitUpgradeAPI() {

res, err := clusterDB.SelectStrings(ctx, tx, "SELECT api_extensions FROM core_cluster_members ORDER BY id")
s.NoError(err)
allExtensions := make([]extensions.Extensions, 0)
allExtensions := make([]types.Extensions, 0)
for _, r := range res {
e := extensions.Extensions{}
e := types.Extensions{}
err = json.Unmarshal([]byte(r), &e)
s.NoError(err)
allExtensions = append(allExtensions, e)
Expand All @@ -442,7 +442,7 @@ func (s *dbSuite) Test_waitUpgradeSchemaAndAPI() {
type versionsWithExtensions struct {
schemaInt uint64
schemaExt uint64
ext extensions.Extensions
ext types.Extensions
}

tests := []struct {
Expand All @@ -457,19 +457,19 @@ func (s *dbSuite) Test_waitUpgradeSchemaAndAPI() {
upgradedLocalInfo: versionsWithExtensions{
schemaInt: 0,
schemaExt: 0,
ext: extensions.Extensions{"internal:a"},
ext: types.Extensions{"internal:a"},
},
},
{
name: "Local node behind in schema and API",
upgradedLocalInfo: versionsWithExtensions{
schemaInt: 0,
schemaExt: 0,
ext: extensions.Extensions{"internal:a"},
ext: types.Extensions{"internal:a"},
},
clusterMembers: []versionsWithExtensions{
{schemaInt: 1, schemaExt: 1, ext: extensions.Extensions{"internal:a"}},
{schemaInt: 1, schemaExt: 1, ext: extensions.Extensions{"internal:a"}},
{schemaInt: 1, schemaExt: 1, ext: types.Extensions{"internal:a"}},
{schemaInt: 1, schemaExt: 1, ext: types.Extensions{"internal:a"}},
},
expectErr: fmt.Errorf("This node's version is behind, please upgrade"),
},
Expand All @@ -478,11 +478,11 @@ func (s *dbSuite) Test_waitUpgradeSchemaAndAPI() {
upgradedLocalInfo: versionsWithExtensions{
schemaInt: 2,
schemaExt: 2,
ext: extensions.Extensions{"internal:a", "b", "c"},
ext: types.Extensions{"internal:a", "b", "c"},
},
clusterMembers: []versionsWithExtensions{
{schemaInt: 1, schemaExt: 1, ext: extensions.Extensions{"internal:a", "b", "c"}},
{schemaInt: 1, schemaExt: 1, ext: extensions.Extensions{"internal:a", "b", "c"}},
{schemaInt: 1, schemaExt: 1, ext: types.Extensions{"internal:a", "b", "c"}},
{schemaInt: 1, schemaExt: 1, ext: types.Extensions{"internal:a", "b", "c"}},
},
expectWait: true,
},
Expand All @@ -491,11 +491,11 @@ func (s *dbSuite) Test_waitUpgradeSchemaAndAPI() {
upgradedLocalInfo: versionsWithExtensions{
schemaInt: 1,
schemaExt: 1,
ext: extensions.Extensions{"internal:a", "b"},
ext: types.Extensions{"internal:a", "b"},
},
clusterMembers: []versionsWithExtensions{
{schemaInt: 2, schemaExt: 2, ext: extensions.Extensions{"internal:a", "b"}},
{schemaInt: 0, schemaExt: 0, ext: extensions.Extensions{"internal:a", "b"}},
{schemaInt: 2, schemaExt: 2, ext: types.Extensions{"internal:a", "b"}},
{schemaInt: 0, schemaExt: 0, ext: types.Extensions{"internal:a", "b"}},
},
expectErr: fmt.Errorf("This node's version is behind, please upgrade"),
},
Expand All @@ -518,7 +518,7 @@ func (s *dbSuite) Test_waitUpgradeSchemaAndAPI() {
Certificate: fmt.Sprintf("test-cert-%d", 0),
SchemaInternal: 0,
SchemaExternal: 0,
APIExtensions: extensions.Extensions{},
APIExtensions: types.Extensions{},
Heartbeat: time.Time{},
Role: "voter",
})
Expand Down Expand Up @@ -599,9 +599,9 @@ func (s *dbSuite) Test_waitUpgradeSchemaAndAPI() {

res, err := clusterDB.SelectStrings(ctx, tx, "SELECT api_extensions FROM core_cluster_members ORDER BY id")
s.NoError(err)
allExtensions := make([]extensions.Extensions, 0)
allExtensions := make([]types.Extensions, 0)
for _, r := range res {
e := extensions.Extensions{}
e := types.Extensions{}
err = json.Unmarshal([]byte(r), &e)
s.NoError(err)
allExtensions = append(allExtensions, e)
Expand Down
11 changes: 5 additions & 6 deletions internal/db/dqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/canonical/microcluster/v3/internal/cluster"
"github.com/canonical/microcluster/v3/internal/db/update"
"github.com/canonical/microcluster/v3/internal/extensions"
"github.com/canonical/microcluster/v3/internal/log"
internalClient "github.com/canonical/microcluster/v3/internal/rest/client"
"github.com/canonical/microcluster/v3/internal/sys"
Expand Down Expand Up @@ -114,7 +113,7 @@ func (db *DqliteDB) log() *slog.Logger {
}

// SetSchema sets schema and API extensions on the DB.
func (db *DqliteDB) SetSchema(schemaExtensions []clusterDB.Update, apiExtensions extensions.Extensions) {
func (db *DqliteDB) SetSchema(schemaExtensions []clusterDB.Update, apiExtensions types.Extensions) {
s := update.NewSchema()
s.AppendSchema(schemaExtensions, apiExtensions)
db.schema = s.Schema()
Expand All @@ -126,7 +125,7 @@ func (db *DqliteDB) Schema() *update.SchemaUpdate {
}

// SchemaVersion returns the current internal and external schema version, as well as all API extensions in memory.
func (db *DqliteDB) SchemaVersion() (versionInternal uint64, versionExternal uint64, apiExtensions extensions.Extensions) {
func (db *DqliteDB) SchemaVersion() (versionInternal uint64, versionExternal uint64, apiExtensions types.Extensions) {
return db.schema.Version()
}

Expand All @@ -146,7 +145,7 @@ func (db *DqliteDB) isInitialized() (bool, error) {
}

// Bootstrap dqlite.
func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr *url.URL, clusterRecord cluster.CoreClusterMember) error {
func (db *DqliteDB) Bootstrap(extensions types.Extensions, addr *url.URL, clusterRecord cluster.CoreClusterMember) error {
var err error
db.listenAddr = addr
db.dqlite, err = dqlite.New(db.os.DatabaseDir(),
Expand Down Expand Up @@ -196,7 +195,7 @@ func (db *DqliteDB) Bootstrap(extensions extensions.Extensions, addr *url.URL, c
}

// Join a dqlite cluster with the address of a member.
func (db *DqliteDB) Join(extensions extensions.Extensions, addr *url.URL, joinAddresses ...string) error {
func (db *DqliteDB) Join(extensions types.Extensions, addr *url.URL, joinAddresses ...string) error {
var err error
db.listenAddr = addr
db.dqlite, err = dqlite.New(db.os.DatabaseDir(),
Expand Down Expand Up @@ -248,7 +247,7 @@ func (db *DqliteDB) Join(extensions extensions.Extensions, addr *url.URL, joinAd
}

// StartWithCluster starts up dqlite and joins the cluster.
func (db *DqliteDB) StartWithCluster(extensions extensions.Extensions, addr *url.URL, clusterMembers map[string]types.AddrPort) error {
func (db *DqliteDB) StartWithCluster(extensions types.Extensions, addr *url.URL, clusterMembers map[string]types.AddrPort) error {
allClusterAddrs := []string{}
for _, clusterMemberAddrs := range clusterMembers {
allClusterAddrs = append(allClusterAddrs, clusterMemberAddrs.String())
Expand Down
3 changes: 1 addition & 2 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

dqliteClient "github.com/canonical/go-dqlite/v3/client"

"github.com/canonical/microcluster/v3/internal/extensions"
"github.com/canonical/microcluster/v3/microcluster/types"
)

Expand All @@ -30,5 +29,5 @@ type DB interface {
IsOpen(ctx context.Context) error

// SchemaVersion returns the current internal and external schema version, as well as all API extensions in memory.
SchemaVersion() (versionInternal uint64, versionExternal uint64, apiExtensions extensions.Extensions)
SchemaVersion() (versionInternal uint64, versionExternal uint64, apiExtensions types.Extensions)
}
10 changes: 5 additions & 5 deletions internal/db/update/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"fmt"
"log/slog"

"github.com/canonical/microcluster/v3/internal/extensions"
"github.com/canonical/microcluster/v3/internal/log"
clusterDB "github.com/canonical/microcluster/v3/microcluster/db"
"github.com/canonical/microcluster/v3/microcluster/types"
)

// PrepareUpdateV1 creates the temporary table `internal_cluster_members_new` if we have not yet run `updateFromV1`.
Expand Down Expand Up @@ -144,7 +144,7 @@ func GetClusterMemberSchemaVersions(ctx context.Context, tx *sql.Tx) (internalSc

// UpdateClusterMemberAPIExtensions sets the API extensions for the cluster member with the given address.
// This helper is non-generated to work before generated statements are loaded, as we update the API extensions.
func UpdateClusterMemberAPIExtensions(ctx context.Context, tx *sql.Tx, apiExtensions extensions.Extensions, memberName string) error {
func UpdateClusterMemberAPIExtensions(ctx context.Context, tx *sql.Tx, apiExtensions types.Extensions, memberName string) error {
table, err := getClusterTableName(ctx, tx)
if err != nil {
return err
Expand Down Expand Up @@ -193,7 +193,7 @@ WHERE name IN ('api_extensions');

// GetClusterMemberAPIExtensions returns the API extensions from all cluster members that are not pending.
// This helper is non-generated to work before generated statements are loaded, as we update the API extensions.
func GetClusterMemberAPIExtensions(ctx context.Context, tx *sql.Tx) ([]extensions.Extensions, error) {
func GetClusterMemberAPIExtensions(ctx context.Context, tx *sql.Tx) ([]types.Extensions, error) {
table, err := getClusterTableName(ctx, tx)
if err != nil {
return nil, err
Expand All @@ -216,9 +216,9 @@ func GetClusterMemberAPIExtensions(ctx context.Context, tx *sql.Tx) ([]extension
}
}()

var results []extensions.Extensions
var results []types.Extensions
for rows.Next() {
var ext extensions.Extensions
var ext types.Extensions
err := rows.Scan(&ext)
if err != nil {
return nil, err
Expand Down
Loading