From f0d2cc5c6773230d3c3c57a77fd96e9e44c5084e Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 30 Sep 2025 16:20:27 -0400 Subject: [PATCH] ra_log: Allow resetting last-written index to snapshot A leader might've taken a snapshot at the last index before a partition takes place. It can then write commands after the partition is in effect and the commands will not be committed. Then another leader can be elected and ask the deposed leader to reset its log down to the snapshot index. Previously this crashed since `ra_log:set_last_index/2` did not consider the snapshot state. --- src/ra_log.erl | 41 ++++++++++++++++++----------- test/ra_log_memory.erl | 23 +++++++++++----- test/ra_server_SUITE.erl | 57 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 22 deletions(-) diff --git a/src/ra_log.erl b/src/ra_log.erl index c527fe97..6891bcb8 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -592,26 +592,35 @@ last_written(#?MODULE{last_written_index_term = LWTI}) -> %% forces the last index and last written index back to a prior index -spec set_last_index(ra_index(), state()) -> {ok, state()} | {not_found, state()}. -set_last_index(Idx, #?MODULE{cfg = Cfg, - last_written_index_term = {LWIdx0, _}} = State0) -> +set_last_index(Idx, State0) -> case fetch_term(Idx, State0) of {undefined, State} -> - {not_found, State}; - {Term, State1} -> - LWIdx = min(Idx, LWIdx0), - {LWTerm, State2} = fetch_term(LWIdx, State1), - %% this should always be found but still assert just in case - %% _if_ this ends up as a genuine reversal next time we try - %% to write to the mem table it will detect this and open - %% a new one - true = LWTerm =/= undefined, - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), - {ok, State2#?MODULE{last_index = Idx, - last_term = Term, - last_written_index_term = {LWIdx, LWTerm}}} + case snapshot_index_term(State) of + {Idx, SnapTerm} -> + set_last_index0(Idx, SnapTerm, State); + _ -> + {not_found, State} + end; + {Term, State} -> + set_last_index0(Idx, Term, State) end. +set_last_index0(Idx, Term, + #?MODULE{cfg = Cfg, + last_written_index_term = {LWIdx0, _}} = State0) -> + LWIdx = min(Idx, LWIdx0), + {LWTerm, State1} = fetch_term(LWIdx, State0), + %% this should always be found but still assert just in case + %% _if_ this ends up as a genuine reversal next time we try + %% to write to the mem table it will detect this and open + %% a new one + true = LWTerm =/= undefined, + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), + {ok, State1#?MODULE{last_index = Idx, + last_term = Term, + last_written_index_term = {LWIdx, LWTerm}}}. + -spec handle_event(event_body(), state()) -> {state(), [effect()]}. handle_event({written, _Term, {FromIdx, _ToIdx}}, diff --git a/test/ra_log_memory.erl b/test/ra_log_memory.erl index b5bd6c6b..b09bb0a1 100644 --- a/test/ra_log_memory.erl +++ b/test/ra_log_memory.erl @@ -148,17 +148,28 @@ last_index_term(#state{last_index = LastIdx, -spec set_last_index(ra_index(), ra_log_memory_state()) -> {ok, ra_log_memory_state()} | {not_found, ra_log_memory_state()}. -set_last_index(Idx, #state{last_written = {LWIdx, _}} = State0) -> +set_last_index(Idx, State0) -> case fetch_term(Idx, State0) of {undefined, State} -> - {not_found, State}; - {Term, State1} when Idx < LWIdx -> + case snapshot_index_term(State) of + {Idx, SnapTerm} -> + set_last_index0(Idx, SnapTerm, State); + _ -> + {not_found, State} + end; + {Term, State} -> + set_last_index0(Idx, Term, State) + end. + +set_last_index0(Idx, Term, #state{last_written = {LWIdx, _}} = State0) -> + case Idx < LWIdx of + true -> %% need to revert last_written too - State = State1#state{last_index = Idx, + State = State0#state{last_index = Idx, last_written = {Idx, Term}}, {ok, State}; - {_, State1} -> - State = State1#state{last_index = Idx}, + false -> + State = State0#state{last_index = Idx}, {ok, State} end. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 23a4e402..63d97b79 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -18,6 +18,7 @@ all() -> election_timeout, follower_aer_diverged, follower_aer_term_mismatch, + follower_aer_term_mismatch_at_snapshot, follower_aer_term_mismatch_snapshot, follower_handles_append_entries_rpc, candidate_handles_append_entries_rpc, @@ -710,6 +711,62 @@ follower_aer_term_mismatch(_Config) -> last_term = 3}, Reply), ok. +follower_aer_term_mismatch_at_snapshot(_Config) -> + %% case where the last correct entry in the log is the snapshot + State0 = (base_state(3, ?FUNCTION_NAME))#{last_applied => 3, + commit_index => 3 + }, + Log0 = maps:get(log, State0), + Meta = #{index => 3, + term => 5, + cluster => #{}, + machine_version => 1}, + Data = <<"hi3">>, + {Log,_} = ra_log_memory:install_snapshot({3, 5}, {Meta, Data}, Log0), + State = maps:put(log, Log, State0), + + %% append entries from the current leader in the current term + AER = #append_entries_rpc{term = 5, + leader_id = ?N1, + prev_log_index = 3, + prev_log_term = 5, % same log term + leader_commit = 3, + entries = [ + {4, 5, usr(<<"hi4">>)}, + {5, 5, usr(<<"hi4">>)}, + {6, 5, usr(<<"hi4">>)} + ]}, + {follower, State1, _} = ra_server:handle_follower(AER, State), + ?assertMatch(#{last_applied := 3, + commit_index := 3}, State1), + {follower, State2, + [{cast, _, {_, #append_entries_reply{term = 5, + success = true, + next_index = 7}}}]} + = ra_server:handle_follower(written_evt(5, {4, 6}), State1), + + %% a new leader deposes the old one and the uncommitted entries must be + %% truncated down to the snapshot index + AER1 = #append_entries_rpc{term = 6, % higher term + leader_id = ?N2, + prev_log_index = 3, + prev_log_term = 5, % same log term + leader_commit = 3, + entries = []}, + + % term mismatch scenario follower has index 3 but for different term + % rewinds back to last_applied + 1 as next index and enters await condition + {follower, State3, + [{cast, _, {_, #append_entries_reply{term = 6, + success = true, + next_index = 4, + last_index = 3, + last_term = 5}}} | _]} + = ra_server:handle_follower(AER1, State2), + ?assertMatch(#{last_applied := 3, + commit_index := 3}, State3), + ok. + follower_aer_term_mismatch_snapshot(_Config) -> %% case when we have to revert all the way back to a snapshot State0 = (base_state(3, ?FUNCTION_NAME))#{last_applied => 3,