Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 144 additions & 1 deletion example/test/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ shutdown_systems() {

# The cluster doesn't always shut down right away; we've given it a chance
for job_pid in $(jobs -p); do
kill -9 "${job_pid}"
# Check if process still exists before trying to kill it
if kill -0 "${job_pid}" 2>/dev/null; then
kill -9 "${job_pid}" 2>/dev/null || true
fi
done
}

Expand Down Expand Up @@ -403,6 +406,140 @@ test_join_token_before_cluster_formed() {
shutdown_systems
}

test_membership_consistency() {
echo "Testing membership consistency checks"

new_systems 4 --heartbeat 2s

# Bootstrap first member (daemon already running from new_systems)
microctl --state-dir "${test_dir}/c1" init "c1" 127.0.0.1:9001 --bootstrap

# Join second member (daemon already running)
token_c2=$(microctl --state-dir "${test_dir}/c1" tokens add "c2")
microctl --state-dir "${test_dir}/c2" init "c2" 127.0.0.1:9002 --token "${token_c2}"

# Start third member and join cluster
token_c3=$(microctl --state-dir "${test_dir}/c1" tokens add "c3")
microctl --state-dir "${test_dir}/c3" init "c3" 127.0.0.1:9003 --token "${token_c3}"

# Fetch join token for c4
token_c4=$(microctl --state-dir "${test_dir}/c1" tokens add "c4")

# Wait for cluster to stabilize
echo " -> Waiting for cluster members to exit PENDING state"

# Wait for all members to be promoted from PENDING
retry_count=0
max_retries=10
while [[ -n "$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '.[] | select(.role == "PENDING")')" ]] && [[ ${retry_count} -lt ${max_retries} ]]; do
echo " -> Still waiting for members to exit PENDING state..."
sleep 2
retry_count=$((retry_count + 1))
done

echo " -> Cluster established successfully"

# Verify cluster is healthy
cluster_size=$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '. | length')
if [ "${cluster_size}" != "3" ]; then
echo "ERROR: Expected cluster size 3, got ${cluster_size}"
exit 1
fi

# Simulate inconsistent state by directly manipulating the database
# while keeping dqlite/truststore intact
echo " -> Simulating inconsistent membership state"

# Remove c2's membership from core_cluster_members (simulating partial remove failure)
microctl --state-dir "${test_dir}/c1" sql "DELETE FROM core_cluster_members WHERE name = 'c2'"

echo " -> Created inconsistent state (c2 removed from database but still in truststore until heartbeat timeout)"

# Test member removal with inconsistent state
echo " -> Testing member removal with inconsistent state"
if microctl --state-dir "${test_dir}/c1" cluster remove c2 2>/tmp/remove_error; then
echo "ERROR: Member removal should have failed due to inconsistent state"
cat /tmp/remove_error
exit 1
else
echo " -> Member removal correctly blocked due to membership inconsistency"
cat /tmp/remove_error
fi

# Try to join a new member - this should fail due to inconsistency
echo " -> Testing join of new member c4 with inconsistent state"
if microctl --state-dir "${test_dir}/c4" init "c4" 127.0.0.1:9004 --token "${token_c4}" 2>/tmp/join_error; then
echo "ERROR: Member c4 should not have been able to join due to inconsistent state"
cat /tmp/join_error
exit 1
else
echo " -> Membership inconsistency correctly detected, c4 join blocked"
cat /tmp/join_error
fi

# Attempt to generate token should fail
echo " -> Testing token generation with inconsistent state"
if microctl --state-dir "${test_dir}/c1" tokens add c5 2>/tmp/token_error; then
echo "ERROR: Token generation should have failed due to inconsistent state"
cat /tmp/token_error
exit 1
else
echo " -> Token generation correctly blocked due to membership inconsistency"
cat /tmp/token_error
fi

echo " -> Membership consistency checks working as expected"

shutdown_systems
}

test_parallel_joins() {
echo "Testing parallel joins"

new_systems 4 --heartbeat 2s

# Bootstrap first member
microctl --state-dir "${test_dir}/c1" init "c1" 127.0.0.1:9001 --bootstrap

# Prepare tokens for remaining members
token_c2=$(microctl --state-dir "${test_dir}/c1" tokens add "c2")
token_c3=$(microctl --state-dir "${test_dir}/c1" tokens add "c3")
token_c4=$(microctl --state-dir "${test_dir}/c1" tokens add "c4")

# Kick off joins in parallel and collect PIDs
microctl --state-dir "${test_dir}/c2" init "c2" 127.0.0.1:9002 --token "${token_c2}" &
pids=($!)
microctl --state-dir "${test_dir}/c3" init "c3" 127.0.0.1:9003 --token "${token_c3}" &
pids+=($!)
microctl --state-dir "${test_dir}/c4" init "c4" 127.0.0.1:9004 --token "${token_c4}" &
pids+=($!)

for pid in "${pids[@]}"; do
if ! wait "${pid}"; then
echo "ERROR: parallel join failed (pid ${pid})"
return 1
fi
done

# Wait for cluster to stabilize
retry_count=0
max_retries=10
while [[ -n "$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '.[] | select(.role == "PENDING")')" ]] && [[ ${retry_count} -lt ${max_retries} ]]; do
sleep 2
retry_count=$((retry_count + 1))
done

cluster_size=$(microctl --state-dir "${test_dir}/c1" cluster list -f yaml | yq '. | length')
if [ "${cluster_size}" != "4" ]; then
echo "ERROR: Expected cluster size 4 after parallel joins, got ${cluster_size}"
return 1
fi

echo "SUCCESS: parallel joins completed without failures"

shutdown_systems
}

test_extended_endpoints() {
new_systems 4 --heartbeat 2s

Expand Down Expand Up @@ -442,6 +579,8 @@ if [ "${1:-"all"}" = "all" ] || [ "${1}" = "" ]; then
test_join_token_after_cluster_formed
test_join_token_before_cluster_formed
test_extended_endpoints
test_membership_consistency
test_parallel_joins
elif [ "${1}" = "recover" ]; then
test_recover
elif [ "${1}" = "tokens" ]; then
Expand All @@ -454,6 +593,10 @@ elif [ "${1}" = "join-before" ]; then
test_join_token_before_cluster_formed
elif [ "${1}" = "extended" ]; then
test_extended_endpoints
elif [ "${1}" = "membership" ]; then
test_membership_consistency
elif [ "${1}" = "parallel-join" ]; then
test_parallel_joins
else
echo "Unknown test ${1}"
fi
59 changes: 39 additions & 20 deletions internal/rest/resources/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func clusterPost(s state.State, r *http.Request) response.Response {
return response.BadRequest(err)
}

ctx, cancel := context.WithTimeout(r.Context(), time.Second*30)
defer cancel()
ctx := r.Context()

leaderClient, err := s.Database().Leader(ctx)
if err != nil {
Expand All @@ -100,26 +99,33 @@ func clusterPost(s state.State, r *http.Request) response.Response {
return response.SmartError(fmt.Errorf("Remote with address %q exists", req.Address.String()))
}

// Check cluster membership consistency before allowing joins
// This ensures core_cluster_members, truststore, and dqlite are all in sync
intState, err := internalState.ToInternal(s)
if err != nil {
return response.SmartError(err)
}

err = intState.CheckMembershipConsistency(ctx)
if err != nil {
return response.SmartError(err)
}

// Forward request to leader.
if leaderInfo.Address != s.Address().URL.Host {
client, err := s.Connect().Leader(false)
if err != nil {
return response.SmartError(err)
}

tokenResponse, err := internalClient.AddClusterMember(r.Context(), client, req)
tokenResponse, err := internalClient.AddClusterMember(ctx, client, req)
if err != nil {
return response.SmartError(err)
}

return response.SyncResponse(true, tokenResponse)
}

intState, err := internalState.ToInternal(s)
if err != nil {
return response.SmartError(err)
}

// Check if the joining node's extensions are compatible with the leader's.
err = intState.Extensions.IsSameVersion(req.Extensions)
if err != nil {
Expand Down Expand Up @@ -423,8 +429,21 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
return response.SmartError(fmt.Errorf("No remote exists with the given name %q", name))
}

ctx, cancel := context.WithTimeout(r.Context(), time.Second*60)
defer cancel()
ctx := r.Context()

// Check cluster membership consistency before allowing removals (unless forced)
// This ensures core_cluster_members, truststore, and dqlite are all in sync
if !force {
intState, err := internalState.ToInternal(s)
if err != nil {
return response.SmartError(err)
}

err = intState.CheckMembershipConsistency(ctx)
if err != nil {
return response.SmartError(err)
}
}

leader, err := s.Database().Leader(ctx)
if err != nil {
Expand Down Expand Up @@ -464,7 +483,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
return response.SmartError(err)
}

err = internalClient.DeleteClusterMember(r.Context(), client, name, force)
err = internalClient.DeleteClusterMember(ctx, client, name, force)
if err != nil {
return response.SmartError(err)
}
Expand All @@ -486,7 +505,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
})
}

info, err := leader.Cluster(r.Context())
info, err := leader.Cluster(ctx)
if err != nil {
return response.SmartError(err)
}
Expand All @@ -505,7 +524,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
}

var clusterMembers []cluster.CoreClusterMember
err = s.Database().Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error {
err = s.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
var err error
clusterMembers, err = cluster.GetCoreClusterMembers(ctx, tx)

Expand Down Expand Up @@ -543,7 +562,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
}

// Refresh members information since we may have changed roles.
info, err = leader.Cluster(r.Context())
info, err = leader.Cluster(ctx)
if err != nil {
return response.SmartError(err)
}
Expand Down Expand Up @@ -587,7 +606,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
clusterDisableMu.Unlock()
}()

err = internalClient.DeleteClusterMember(r.Context(), client, name, force)
err = internalClient.DeleteClusterMember(ctx, client, name, force)
if err != nil {
return response.SmartError(err)
}
Expand Down Expand Up @@ -627,7 +646,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
}

// Remove the cluster member from the database.
err = s.Database().Transaction(r.Context(), func(ctx context.Context, tx *sql.Tx) error {
err = s.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
return cluster.DeleteCoreClusterMember(ctx, tx, remote.Address.String())
})
if err != nil && !force {
Expand All @@ -636,7 +655,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {

// Remove the node from dqlite, if it has a record there.
if index >= 0 {
err = leader.Remove(r.Context(), info[index].ID)
err = leader.Remove(ctx, info[index].ID)
if err != nil {
return response.SmartError(err)
}
Expand All @@ -662,7 +681,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
return response.SmartError(err)
}

err = internalClient.ResetClusterMember(r.Context(), client, name, force)
err = internalClient.ResetClusterMember(ctx, client, name, force)
if err != nil && !force {
return response.SmartError(err)
}
Expand All @@ -673,7 +692,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {
}

// Run the PostRemove hook locally.
hookCtx, hookCancel := context.WithCancel(r.Context())
hookCtx, hookCancel := context.WithCancel(ctx)
err = intState.Hooks.PostRemove(hookCtx, s, force)
hookCancel()
if err != nil {
Expand All @@ -687,7 +706,7 @@ func clusterMemberDelete(s state.State, r *http.Request) response.Response {

// Run the PostRemove hook on all other members.
remotes := s.Remotes()
err = clients.Query(r.Context(), true, func(ctx context.Context, c types.Client) error {
err = clients.Query(ctx, true, func(ctx context.Context, c types.Client) error {
c.SetClusterNotification()
addrPort, err := types.ParseAddrPort(c.URL().Host)
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion internal/rest/resources/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import (
"github.com/canonical/microcluster/v3/internal/cluster"
"github.com/canonical/microcluster/v3/internal/log"
"github.com/canonical/microcluster/v3/internal/rest/access"
"github.com/canonical/microcluster/v3/internal/state"
internalState "github.com/canonical/microcluster/v3/internal/state"
"github.com/canonical/microcluster/v3/internal/utils"
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

var tokensCmd = rest.Endpoint{
Expand Down Expand Up @@ -49,6 +50,18 @@ func tokensPost(state state.State, r *http.Request) response.Response {
return response.SmartError(fmt.Errorf("Token name %q is not a valid FQDN: %w", req.Name, err))
}

// Check cluster membership consistency before allowing token creation
// This ensures core_cluster_members, truststore, and dqlite are all in sync
intState, err := internalState.ToInternal(state)
if err != nil {
return response.SmartError(err)
}

err = intState.CheckMembershipConsistency(r.Context())
if err != nil {
return response.SmartError(err)
}

// Generate join token for new member. This will be stored alongside the join
// address and cluster certificate to simplify setup.
tokenKey, err := shared.RandomCryptoString()
Expand Down
Loading