@@ -928,4 +928,178 @@ mod tests {
928928 let rendered = metrics. render ( ) . unwrap ( ) ;
929929 assert ! ( rendered. contains( "rpc_errors_total{method=\" getEvents\" } 1" ) ) ;
930930 }
931+
932+ // -------------------------------------------------------------------------
933+ // sync cursor progression under empty event streams
934+ //
935+ // fetch_events_since silently returns Ok(vec![]) on RPC failure.
936+ // These tests verify the cursor never jumps or rewinds in that scenario.
937+ // -------------------------------------------------------------------------
938+
939+ /// Build a client whose RPC endpoint is unreachable (port 0), so every RPC
940+ /// call fails immediately. Returns None when Redis is unavailable so each
941+ /// test can skip gracefully without failing CI.
942+ async fn make_dead_rpc_client ( ) -> Option < BlockchainClient > {
943+ let mut config = Config :: from_env ( ) ;
944+ config. blockchain_rpc_url = "http://127.0.0.1:0" . to_string ( ) ;
945+ config. retry_attempts = 1 ;
946+ config. retry_base_delay_ms = 1 ;
947+ // Small lag keeps confirmed_tip arithmetic predictable.
948+ config. confirmation_ledger_lag = 5 ;
949+ // No market IDs avoids extra RPC calls inside sync_once.
950+ config. sync_market_ids = vec ! [ ] ;
951+
952+ let metrics = Metrics :: new ( ) . unwrap ( ) ;
953+ let cache = match RedisCache :: new ( & config. redis_url ) . await {
954+ Ok ( c) => c,
955+ Err ( _) => return None ,
956+ } ;
957+ Some ( BlockchainClient :: new ( & config, cache, metrics) . unwrap ( ) )
958+ }
959+
960+ /// When latest_ledger RPC fails, sync_once falls back to cursor_ledger as
961+ /// the latest value. confirmed_tip = cursor - lag ≤ cursor, so the
962+ /// early-return guard fires and the cursor is returned unchanged.
963+ #[ tokio:: test]
964+ async fn test_cursor_does_not_advance_when_latest_ledger_rpc_fails ( ) {
965+ let client = match make_dead_rpc_client ( ) . await {
966+ Some ( c) => c,
967+ None => {
968+ println ! ( "Skipping test_cursor_does_not_advance_when_latest_ledger_rpc_fails: Redis unavailable" ) ;
969+ return ;
970+ }
971+ } ;
972+
973+ let initial: u32 = 500 ;
974+ let next = client. sync_once ( initial) . await . unwrap ( ) ;
975+ assert_eq ! (
976+ next, initial,
977+ "cursor must not change when latest_ledger RPC fails (got {next}, want {initial})"
978+ ) ;
979+ }
980+
981+ /// Starting from ledger 0 (fresh worker state) with a dead RPC the cursor
982+ /// must stay at 0 and must not jump to any non-zero value.
983+ #[ tokio:: test]
984+ async fn test_cursor_stays_at_zero_on_rpc_failure_from_fresh_state ( ) {
985+ let client = match make_dead_rpc_client ( ) . await {
986+ Some ( c) => c,
987+ None => {
988+ println ! ( "Skipping test_cursor_stays_at_zero_on_rpc_failure_from_fresh_state: Redis unavailable" ) ;
989+ return ;
990+ }
991+ } ;
992+
993+ let next = client. sync_once ( 0 ) . await . unwrap ( ) ;
994+ assert_eq ! (
995+ next, 0 ,
996+ "cursor must stay at 0 when RPC fails from fresh state (got {next})"
997+ ) ;
998+ }
999+
1000+ /// When confirmed_tip ≤ cursor (chain has not advanced past the lag window),
1001+ /// sync_once must return the cursor unchanged – idempotency guarantee.
1002+ #[ tokio:: test]
1003+ async fn test_cursor_is_idempotent_when_already_at_confirmed_tip ( ) {
1004+ let client = match make_dead_rpc_client ( ) . await {
1005+ Some ( c) => c,
1006+ None => {
1007+ println ! ( "Skipping test_cursor_is_idempotent_when_already_at_confirmed_tip: Redis unavailable" ) ;
1008+ return ;
1009+ }
1010+ } ;
1011+
1012+ // Dead RPC → latest falls back to cursor_ledger.
1013+ // confirmed_tip = cursor - lag ≤ cursor → early return.
1014+ let cursor: u32 = 200 ;
1015+ let next = client. sync_once ( cursor) . await . unwrap ( ) ;
1016+ assert_eq ! (
1017+ next, cursor,
1018+ "cursor must be idempotent when already at confirmed tip (got {next}, want {cursor})"
1019+ ) ;
1020+ }
1021+
1022+ /// Across multiple consecutive sync cycles with a dead RPC the cursor must
1023+ /// never rewind below its starting value. Guards against any regression
1024+ /// where a silent empty response causes the cursor to go backwards.
1025+ #[ tokio:: test]
1026+ async fn test_cursor_never_rewinds_across_multiple_empty_sync_cycles ( ) {
1027+ let client = match make_dead_rpc_client ( ) . await {
1028+ Some ( c) => c,
1029+ None => {
1030+ println ! ( "Skipping test_cursor_never_rewinds_across_multiple_empty_sync_cycles: Redis unavailable" ) ;
1031+ return ;
1032+ }
1033+ } ;
1034+
1035+ let initial: u32 = 1_000 ;
1036+ let mut cursor = initial;
1037+
1038+ for round in 0 ..5u32 {
1039+ let next = client. sync_once ( cursor) . await . unwrap ( ) ;
1040+ assert ! (
1041+ next >= initial,
1042+ "cursor rewound on round {round}: started at {initial}, became {next}"
1043+ ) ;
1044+ cursor = next;
1045+ }
1046+ }
1047+
1048+ /// fetch_events_since must return Ok(vec![]) – not an error – when the RPC
1049+ /// is unreachable, and the silent fallback must be recorded in the
1050+ /// rpc_errors_total metric so operators can detect the failure.
1051+ #[ tokio:: test]
1052+ async fn test_empty_event_stream_on_rpc_failure_is_recorded_in_metrics ( ) {
1053+ let mut config = Config :: from_env ( ) ;
1054+ config. blockchain_rpc_url = "http://127.0.0.1:0" . to_string ( ) ;
1055+ config. retry_attempts = 1 ;
1056+ config. retry_base_delay_ms = 1 ;
1057+ config. sync_market_ids = vec ! [ ] ;
1058+
1059+ let metrics = Metrics :: new ( ) . unwrap ( ) ;
1060+ let cache = match RedisCache :: new ( & config. redis_url ) . await {
1061+ Ok ( c) => c,
1062+ Err ( _) => {
1063+ println ! ( "Skipping test_empty_event_stream_on_rpc_failure_is_recorded_in_metrics: Redis unavailable" ) ;
1064+ return ;
1065+ }
1066+ } ;
1067+
1068+ let client = BlockchainClient :: new ( & config, cache, metrics. clone ( ) ) . unwrap ( ) ;
1069+
1070+ // RPC failure must be masked – the call must succeed with an empty list.
1071+ let events = client. fetch_events_since ( 100 ) . await . unwrap ( ) ;
1072+ assert ! (
1073+ events. is_empty( ) ,
1074+ "RPC failure must produce an empty event list, not propagate an error"
1075+ ) ;
1076+
1077+ // The silent fallback must be observable via metrics.
1078+ let rendered = metrics. render ( ) . unwrap ( ) ;
1079+ assert ! (
1080+ rendered. contains( "rpc_errors_total{method=\" getEvents\" } 1" ) ,
1081+ "silent empty-stream fallback must increment rpc_errors_total for getEvents"
1082+ ) ;
1083+ }
1084+
1085+ /// sync_once must return Ok (not Err) when the RPC is unreachable, so the
1086+ /// run_sync_worker loop takes the Ok branch and preserves the cursor.
1087+ #[ tokio:: test]
1088+ async fn test_sync_once_returns_ok_not_err_on_rpc_failure ( ) {
1089+ let client = match make_dead_rpc_client ( ) . await {
1090+ Some ( c) => c,
1091+ None => {
1092+ println ! (
1093+ "Skipping test_sync_once_returns_ok_not_err_on_rpc_failure: Redis unavailable"
1094+ ) ;
1095+ return ;
1096+ }
1097+ } ;
1098+
1099+ let result = client. sync_once ( 300 ) . await ;
1100+ assert ! (
1101+ result. is_ok( ) ,
1102+ "sync_once must return Ok on RPC failure so the worker loop preserves the cursor"
1103+ ) ;
1104+ }
9311105}
0 commit comments