From 024bcba85fb63b1878f8bdbb87f28037e25c97dc Mon Sep 17 00:00:00 2001 From: secwall Date: Tue, 7 Apr 2026 11:01:09 +0200 Subject: [PATCH] Reconnect to dcs if we fail to read back health node --- internal/app/app.go | 21 ++++++++++++++++++--- internal/app/candidate.go | 3 +++ internal/app/local.go | 7 +++++++ internal/app/lost.go | 3 +++ internal/app/manager.go | 3 +++ 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 0c9f532..ad8b9ff 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -19,23 +19,24 @@ import ( // App is main application structure type App struct { - dcsDivergeTime time.Time replFailTime time.Time lostSince time.Time + dcsDivergeTime time.Time + dcs dcs.DCS critical atomic.Value ctx context.Context - dcs dcs.DCS + cache *valkey.SentiCacheNode config *config.Config splitTime map[string]time.Time logger *slog.Logger nodeFailTime map[string]time.Time shard *valkey.Shard - cache *valkey.SentiCacheNode daemonLock *flock.Flock timings *TimingReporter mode appMode aofMode aofMode state appState + dcsReconnect atomic.Bool } func baseContext() context.Context { @@ -109,6 +110,20 @@ func (app *App) connectDCS() error { return nil } +func (app *App) handleDcsReconnect() bool { + if !app.dcsReconnect.Load() { + return false + } + app.logger.Warn("Performing a scheduled DCS reconnection") + err := app.reconnectDCS() + if err != nil { + app.logger.Error("Scheduled DCS reconnection failed", slog.Any("error", err)) + return false + } + app.dcsReconnect.Store(false) + return true +} + func (app *App) reconnectDCS() error { app.logger.Info("Attempting DCS reconnection after prolonged Lost state") oldDCS := app.dcs diff --git a/internal/app/candidate.go b/internal/app/candidate.go index 7c96cfa..4c86c00 100644 --- a/internal/app/candidate.go +++ b/internal/app/candidate.go @@ -11,6 +11,9 @@ func (app *App) stateCandidate() appState { if !app.dcs.IsConnected() { return stateLost } + if app.handleDcsReconnect() { + return stateCandidate + } err := app.shard.UpdateHostsInfo() if err != nil { app.logger.Error("Candidate: failed to update host info from DCS", slog.Any("error", err)) diff --git a/internal/app/local.go b/internal/app/local.go index 5394ca0..5d3ddc7 100644 --- a/internal/app/local.go +++ b/internal/app/local.go @@ -30,6 +30,13 @@ func (app *App) healthChecker() { err := app.dcs.SetEphemeral(path, hc) if err != nil { app.logger.Error("Failed to set healthcheck status to dcs", slog.Any("error", err)) + } else { + var readBack HostState + err = app.dcs.Get(path, &readBack) + if err != nil { + app.logger.Warn("Failed to read back health node from DCS", slog.Any("error", err)) + app.dcsReconnect.Store(true) + } } } else if !hcCheckTime.IsZero() { if time.Since(hcCheckTime) < 5*app.config.HealthCheckInterval { diff --git a/internal/app/lost.go b/internal/app/lost.go index 2b3b957..cdd485d 100644 --- a/internal/app/lost.go +++ b/internal/app/lost.go @@ -7,6 +7,9 @@ import ( ) func (app *App) stateLost() appState { + if app.handleDcsReconnect() { + return stateCandidate + } if app.dcs.IsConnected() { return stateCandidate } diff --git a/internal/app/manager.go b/internal/app/manager.go index 365eb04..a1b90b7 100644 --- a/internal/app/manager.go +++ b/internal/app/manager.go @@ -13,6 +13,9 @@ func (app *App) stateManager() appState { if !app.dcs.IsConnected() { return stateLost } + if app.handleDcsReconnect() { + return stateCandidate + } if !app.dcs.AcquireLock(pathManagerLock) { return stateCandidate }