diff --git a/example/test/main.sh b/example/test/main.sh index 1ffbf043..7ac7b59e 100755 --- a/example/test/main.sh +++ b/example/test/main.sh @@ -68,7 +68,10 @@ shutdown_systems() { # The cluster doesn't always shut down right away; we've given it a chance for job_pid in $(jobs -p); do - kill -9 "${job_pid}" + # Check if process still exists before trying to kill it + if kill -0 "${job_pid}" 2>/dev/null; then + kill -9 "${job_pid}" 2>/dev/null || true + fi done } @@ -403,6 +406,140 @@ test_join_token_before_cluster_formed() { shutdown_systems } +test_membership_consistency() { + echo "Testing membership consistency checks" + + new_systems 4 --heartbeat 2s + + # Bootstrap first member (daemon already running from new_systems) + microctl --state-dir "${test_dir}/c1" init "c1" 127.0.0.1:9001 --bootstrap + + # Join second member (daemon already running) + token_c2=$(microctl --state-dir "${test_dir}/c1" tokens add "c2") + microctl --state-dir "${test_dir}/c2" init "c2" 127.0.0.1:9002 --token "${token_c2}" + + # Start third member and join cluster + token_c3=$(microctl --state-dir "${test_dir}/c1" tokens add "c3") + microctl --state-dir "${test_dir}/c3" init "c3" 127.0.0.1:9003 --token "${token_c3}" + + # Fetch join token for c4 + token_c4=$(microctl --state-dir "${test_dir}/c1" tokens add "c4") + + # Wait for cluster to stabilize + echo " -> Waiting for cluster members to exit PENDING state" + + # Wait for all members to be promoted from PENDING + retry_count=0 + max_retries=10 + while [[ -n "$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '.[] | select(.role == "PENDING")')" ]] && [[ ${retry_count} -lt ${max_retries} ]]; do + echo " -> Still waiting for members to exit PENDING state..." + sleep 2 + retry_count=$((retry_count + 1)) + done + + echo " -> Cluster established successfully" + + # Verify cluster is healthy + cluster_size=$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '. | length') + if [ "${cluster_size}" != "3" ]; then + echo "ERROR: Expected cluster size 3, got ${cluster_size}" + exit 1 + fi + + # Simulate inconsistent state by directly manipulating the database + # while keeping dqlite/truststore intact + echo " -> Simulating inconsistent membership state" + + # Remove c2's membership from core_cluster_members (simulating partial remove failure) + microctl --state-dir "${test_dir}/c1" sql "DELETE FROM core_cluster_members WHERE name = 'c2'" + + echo " -> Created inconsistent state (c2 removed from database but still in truststore until heartbeat timeout)" + + # Test member removal with inconsistent state + echo " -> Testing member removal with inconsistent state" + if microctl --state-dir "${test_dir}/c1" cluster remove c2 2>/tmp/remove_error; then + echo "ERROR: Member removal should have failed due to inconsistent state" + cat /tmp/remove_error + exit 1 + else + echo " -> Member removal correctly blocked due to membership inconsistency" + cat /tmp/remove_error + fi + + # Try to join a new member - this should fail due to inconsistency + echo " -> Testing join of new member c4 with inconsistent state" + if microctl --state-dir "${test_dir}/c4" init "c4" 127.0.0.1:9004 --token "${token_c4}" 2>/tmp/join_error; then + echo "ERROR: Member c4 should not have been able to join due to inconsistent state" + cat /tmp/join_error + exit 1 + else + echo " -> Membership inconsistency correctly detected, c4 join blocked" + cat /tmp/join_error + fi + + # Attempt to generate token should fail + echo " -> Testing token generation with inconsistent state" + if microctl --state-dir "${test_dir}/c1" tokens add c5 2>/tmp/token_error; then + echo "ERROR: Token generation should have failed due to inconsistent state" + cat /tmp/token_error + exit 1 + else + echo " -> Token generation correctly blocked due to membership inconsistency" + cat /tmp/token_error + fi + + echo " -> Membership consistency checks working as expected" + + shutdown_systems +} + +test_parallel_joins() { + echo "Testing parallel joins" + + new_systems 4 --heartbeat 2s + + # Bootstrap first member + microctl --state-dir "${test_dir}/c1" init "c1" 127.0.0.1:9001 --bootstrap + + # Prepare tokens for remaining members + token_c2=$(microctl --state-dir "${test_dir}/c1" tokens add "c2") + token_c3=$(microctl --state-dir "${test_dir}/c1" tokens add "c3") + token_c4=$(microctl --state-dir "${test_dir}/c1" tokens add "c4") + + # Kick off joins in parallel and collect PIDs + microctl --state-dir "${test_dir}/c2" init "c2" 127.0.0.1:9002 --token "${token_c2}" & + pids=($!) + microctl --state-dir "${test_dir}/c3" init "c3" 127.0.0.1:9003 --token "${token_c3}" & + pids+=($!) + microctl --state-dir "${test_dir}/c4" init "c4" 127.0.0.1:9004 --token "${token_c4}" & + pids+=($!) + + for pid in "${pids[@]}"; do + if ! wait "${pid}"; then + echo "ERROR: parallel join failed (pid ${pid})" + return 1 + fi + done + + # Wait for cluster to stabilize + retry_count=0 + max_retries=10 + while [[ -n "$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '.[] | select(.role == "PENDING")')" ]] && [[ ${retry_count} -lt ${max_retries} ]]; do + sleep 2 + retry_count=$((retry_count + 1)) + done + + cluster_size=$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '. | length') + if [ "${cluster_size}" != "4" ]; then + echo "ERROR: Expected cluster size 4 after parallel joins, got ${cluster_size}" + return 1 + fi + + echo "SUCCESS: parallel joins completed without failures" + + shutdown_systems +} + test_extended_endpoints() { new_systems 4 --heartbeat 2s @@ -442,6 +579,8 @@ if [ "${1:-"all"}" = "all" ] || [ "${1}" = "" ]; then test_join_token_after_cluster_formed test_join_token_before_cluster_formed test_extended_endpoints + test_membership_consistency + test_parallel_joins elif [ "${1}" = "recover" ]; then test_recover elif [ "${1}" = "tokens" ]; then @@ -454,6 +593,10 @@ elif [ "${1}" = "join-before" ]; then test_join_token_before_cluster_formed elif [ "${1}" = "extended" ]; then test_extended_endpoints +elif [ "${1}" = "membership" ]; then + test_membership_consistency +elif [ "${1}" = "parallel-join" ]; then + test_parallel_joins else echo "Unknown test ${1}" fi diff --git a/internal/rest/resources/cluster.go b/internal/rest/resources/cluster.go index 423ad9c6..d6cfa3e5 100644 --- a/internal/rest/resources/cluster.go +++ b/internal/rest/resources/cluster.go @@ -76,8 +76,7 @@ func clusterPost(s state.State, r *http.Request) response.Response { return response.BadRequest(err) } - ctx, cancel := context.WithTimeout(r.Context(), time.Second*30) - defer cancel() + ctx := r.Context() leaderClient, err := s.Database().Leader(ctx) if err != nil { @@ -100,6 +99,18 @@ func clusterPost(s state.State, r *http.Request) response.Response { return response.SmartError(fmt.Errorf("Remote with address %q exists", req.Address.String())) } + // Check cluster membership consistency before allowing joins + // This ensures core_cluster_members, truststore, and dqlite are all in sync + intState, err := internalState.ToInternal(s) + if err != nil { + return response.SmartError(err) + } + + err = intState.CheckMembershipConsistency(ctx) + if err != nil { + return response.SmartError(err) + } + // Forward request to leader. if leaderInfo.Address != s.Address().URL.Host { client, err := s.Connect().Leader(false) @@ -107,7 +118,7 @@ func clusterPost(s state.State, r *http.Request) response.Response { return response.SmartError(err) } - tokenResponse, err := internalClient.AddClusterMember(r.Context(), client, req) + tokenResponse, err := internalClient.AddClusterMember(ctx, client, req) if err != nil { return response.SmartError(err) } @@ -115,11 +126,6 @@ func clusterPost(s state.State, r *http.Request) response.Response { return response.SyncResponse(true, tokenResponse) } - intState, err := internalState.ToInternal(s) - if err != nil { - return response.SmartError(err) - } - // Check if the joining node's extensions are compatible with the leader's. err = intState.Extensions.IsSameVersion(req.Extensions) if err != nil { @@ -423,8 +429,21 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { return response.SmartError(fmt.Errorf("No remote exists with the given name %q", name)) } - ctx, cancel := context.WithTimeout(r.Context(), time.Second*60) - defer cancel() + ctx := r.Context() + + // Check cluster membership consistency before allowing removals (unless forced) + // This ensures core_cluster_members, truststore, and dqlite are all in sync + if !force { + intState, err := internalState.ToInternal(s) + if err != nil { + return response.SmartError(err) + } + + err = intState.CheckMembershipConsistency(ctx) + if err != nil { + return response.SmartError(err) + } + } leader, err := s.Database().Leader(ctx) if err != nil { @@ -464,7 +483,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { return response.SmartError(err) } - err = internalClient.DeleteClusterMember(r.Context(), client, name, force) + err = internalClient.DeleteClusterMember(ctx, client, name, force) if err != nil { return response.SmartError(err) } @@ -486,7 +505,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { }) } - info, err := leader.Cluster(r.Context()) + info, err := leader.Cluster(ctx) if err != nil { return response.SmartError(err) } @@ -505,7 +524,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { } var clusterMembers []cluster.CoreClusterMember - err = s.Database().Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error { + err = s.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { var err error clusterMembers, err = cluster.GetCoreClusterMembers(ctx, tx) @@ -543,7 +562,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { } // Refresh members information since we may have changed roles. - info, err = leader.Cluster(r.Context()) + info, err = leader.Cluster(ctx) if err != nil { return response.SmartError(err) } @@ -587,7 +606,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { clusterDisableMu.Unlock() }() - err = internalClient.DeleteClusterMember(r.Context(), client, name, force) + err = internalClient.DeleteClusterMember(ctx, client, name, force) if err != nil { return response.SmartError(err) } @@ -627,7 +646,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { } // Remove the cluster member from the database. - err = s.Database().Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error { + err = s.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { return cluster.DeleteCoreClusterMember(ctx, tx, remote.Address.String()) }) if err != nil && !force { @@ -636,7 +655,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { // Remove the node from dqlite, if it has a record there. if index >= 0 { - err = leader.Remove(r.Context(), info[index].ID) + err = leader.Remove(ctx, info[index].ID) if err != nil { return response.SmartError(err) } @@ -662,7 +681,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { return response.SmartError(err) } - err = internalClient.ResetClusterMember(r.Context(), client, name, force) + err = internalClient.ResetClusterMember(ctx, client, name, force) if err != nil && !force { return response.SmartError(err) } @@ -673,7 +692,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { } // Run the PostRemove hook locally. - hookCtx, hookCancel := context.WithCancel(r.Context()) + hookCtx, hookCancel := context.WithCancel(ctx) err = intState.Hooks.PostRemove(hookCtx, s, force) hookCancel() if err != nil { @@ -687,7 +706,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response { // Run the PostRemove hook on all other members. remotes := s.Remotes() - err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error { + err = clients.Query(ctx, true, func(ctx context.Context, c types.Client) error { c.SetClusterNotification() addrPort, err := types.ParseAddrPort(c.URL().Host) if err != nil { diff --git a/internal/rest/resources/tokens.go b/internal/rest/resources/tokens.go index 749bfae2..85d6c51f 100644 --- a/internal/rest/resources/tokens.go +++ b/internal/rest/resources/tokens.go @@ -15,11 +15,12 @@ import ( "github.com/canonical/microcluster/v3/internal/cluster" "github.com/canonical/microcluster/v3/internal/log" "github.com/canonical/microcluster/v3/internal/rest/access" - "github.com/canonical/microcluster/v3/internal/state" + internalState "github.com/canonical/microcluster/v3/internal/state" "github.com/canonical/microcluster/v3/internal/utils" "github.com/canonical/microcluster/v3/microcluster/rest" "github.com/canonical/microcluster/v3/microcluster/rest/response" "github.com/canonical/microcluster/v3/microcluster/types" + "github.com/canonical/microcluster/v3/state" ) var tokensCmd = rest.Endpoint{ @@ -49,6 +50,18 @@ func tokensPost(state state.State, r *http.Request) response.Response { return response.SmartError(fmt.Errorf("Token name %q is not a valid FQDN: %w", req.Name, err)) } + // Check cluster membership consistency before allowing token creation + // This ensures core_cluster_members, truststore, and dqlite are all in sync + intState, err := internalState.ToInternal(state) + if err != nil { + return response.SmartError(err) + } + + err = intState.CheckMembershipConsistency(r.Context()) + if err != nil { + return response.SmartError(err) + } + // Generate join token for new member. This will be stored alongside the join // address and cluster certificate to simplify setup. tokenKey, err := shared.RandomCryptoString() diff --git a/internal/state/state.go b/internal/state/state.go index 76860d47..20ed93e4 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -3,14 +3,19 @@ package state import ( "context" "crypto/x509" + "database/sql" "fmt" "math/rand" "net/url" + "slices" + "sort" "time" + dqliteClient "github.com/canonical/go-dqlite/v3/client" "github.com/canonical/lxd/shared" "github.com/canonical/lxd/shared/api" + "github.com/canonical/microcluster/v3/internal/cluster" internalConfig "github.com/canonical/microcluster/v3/internal/config" "github.com/canonical/microcluster/v3/internal/db" "github.com/canonical/microcluster/v3/internal/endpoints" @@ -283,3 +288,98 @@ func ToInternal(s State) (*InternalState, error) { return nil, fmt.Errorf("Underlying State is not an InternalState") } + +// CheckMembershipConsistency verifies that core_cluster_members, truststore, and dqlite +// all have consistent membership information. This should be called before any member +// add/remove operations or join token generation to ensure the cluster is in a healthy state. +func (s *InternalState) CheckMembershipConsistency(ctx context.Context) error { + // Assign a context timeout if we don't already have one. + _, ok := ctx.Deadline() + if !ok { + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx = timeoutCtx + defer cancel() + } + + for { + coreClusterMembers, truststoreRemotes, dqliteNodes, err := s.getMembershipData(ctx) + if err != nil { + return fmt.Errorf("Failed to gather membership data for consistency check: %w", err) + } + + err = s.checkMembershipConsistency(coreClusterMembers, truststoreRemotes, dqliteNodes) + if err != nil { + select { + case <-ctx.Done(): + return fmt.Errorf("Membership consistency check failed after timeout: %w", err) + case <-time.After(200 * time.Millisecond): + continue + } + } + + return nil + } +} + +// getMembershipData retrieves membership information from all sources. +func (s *InternalState) getMembershipData(ctx context.Context) ([]cluster.CoreClusterMember, map[string]trust.Remote, []dqliteClient.NodeInfo, error) { + // Get database core cluster members + var coreClusterMembers []cluster.CoreClusterMember + err := s.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error { + var err error + coreClusterMembers, err = cluster.GetCoreClusterMembers(ctx, tx) + return err + }) + if err != nil { + return nil, nil, nil, fmt.Errorf("Failed to get core cluster members from database: %w", err) + } + + // Get truststore remotes + truststoreRemotes := s.Remotes().RemotesByName() + + // Get dqlite cluster info + leaderClient, err := s.Database().Leader(ctx) + if err != nil { + return nil, nil, nil, fmt.Errorf("Failed to get dqlite leader: %w", err) + } + + // Get dqlite cluster members + dqliteNodes, err := s.Database().Cluster(ctx, leaderClient) + if err != nil { + return nil, nil, nil, fmt.Errorf("Failed to get dqlite cluster info: %w", err) + } + + return coreClusterMembers, truststoreRemotes, dqliteNodes, nil +} + +// checkMembershipConsistency checks consistency across all three membership sources using addresses. +func (s *InternalState) checkMembershipConsistency(coreClusterMembers []cluster.CoreClusterMember, truststoreRemotes map[string]trust.Remote, dqliteNodes []dqliteClient.NodeInfo) error { + // Collect addresses from each source into sorted slices + var coreClusterAddresses []string + for _, member := range coreClusterMembers { + coreClusterAddresses = append(coreClusterAddresses, member.Address) + } + + sort.Strings(coreClusterAddresses) + + var trustAddresses []string + for _, remote := range truststoreRemotes { + trustAddresses = append(trustAddresses, remote.Address.String()) + } + + sort.Strings(trustAddresses) + + var dqliteAddresses []string + for _, node := range dqliteNodes { + dqliteAddresses = append(dqliteAddresses, node.Address) + } + + sort.Strings(dqliteAddresses) + + // Check if all three slices are equal + if !slices.Equal(coreClusterAddresses, trustAddresses) || !slices.Equal(coreClusterAddresses, dqliteAddresses) { + return fmt.Errorf("Microcluster node membership is inconsistent across core_cluster_members (%v), truststore (%v) and dqlite (%v)", coreClusterAddresses, trustAddresses, dqliteAddresses) + } + + return nil +}