diff --git a/internal/app/app.go b/internal/app/app.go index 0c9f532..ad8b9ff 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -19,23 +19,24 @@ import ( // App is main application structure type App struct { - dcsDivergeTime time.Time replFailTime time.Time lostSince time.Time + dcsDivergeTime time.Time + dcs dcs.DCS critical atomic.Value ctx context.Context - dcs dcs.DCS + cache *valkey.SentiCacheNode config *config.Config splitTime map[string]time.Time logger *slog.Logger nodeFailTime map[string]time.Time shard *valkey.Shard - cache *valkey.SentiCacheNode daemonLock *flock.Flock timings *TimingReporter mode appMode aofMode aofMode state appState + dcsReconnect atomic.Bool } func baseContext() context.Context { @@ -109,6 +110,20 @@ func (app *App) connectDCS() error { return nil } +func (app *App) handleDcsReconnect() bool { + if !app.dcsReconnect.Load() { + return false + } + app.logger.Warn("Performing a scheduled DCS reconnection") + err := app.reconnectDCS() + if err != nil { + app.logger.Error("Scheduled DCS reconnection failed", slog.Any("error", err)) + return false + } + app.dcsReconnect.Store(false) + return true +} + func (app *App) reconnectDCS() error { app.logger.Info("Attempting DCS reconnection after prolonged Lost state") oldDCS := app.dcs diff --git a/internal/app/candidate.go b/internal/app/candidate.go index 7c96cfa..4c86c00 100644 --- a/internal/app/candidate.go +++ b/internal/app/candidate.go @@ -11,6 +11,9 @@ func (app *App) stateCandidate() appState { if !app.dcs.IsConnected() { return stateLost } + if app.handleDcsReconnect() { + return stateCandidate + } err := app.shard.UpdateHostsInfo() if err != nil { app.logger.Error("Candidate: failed to update host info from DCS", slog.Any("error", err)) diff --git a/internal/app/local.go b/internal/app/local.go index 5394ca0..5d3ddc7 100644 --- a/internal/app/local.go +++ b/internal/app/local.go @@ -30,6 +30,13 @@ func (app *App) healthChecker() { err := app.dcs.SetEphemeral(path, hc) if err != nil { app.logger.Error("Failed to set healthcheck status to dcs", slog.Any("error", err)) + } else { + var readBack HostState + err = app.dcs.Get(path, &readBack) + if err != nil { + app.logger.Warn("Failed to read back health node from DCS", slog.Any("error", err)) + app.dcsReconnect.Store(true) + } } } else if !hcCheckTime.IsZero() { if time.Since(hcCheckTime) < 5*app.config.HealthCheckInterval { diff --git a/internal/app/lost.go b/internal/app/lost.go index 2b3b957..cdd485d 100644 --- a/internal/app/lost.go +++ b/internal/app/lost.go @@ -7,6 +7,9 @@ import ( ) func (app *App) stateLost() appState { + if app.handleDcsReconnect() { + return stateCandidate + } if app.dcs.IsConnected() { return stateCandidate } diff --git a/internal/app/manager.go b/internal/app/manager.go index 365eb04..a1b90b7 100644 --- a/internal/app/manager.go +++ b/internal/app/manager.go @@ -13,6 +13,9 @@ func (app *App) stateManager() appState { if !app.dcs.IsConnected() { return stateLost } + if app.handleDcsReconnect() { + return stateCandidate + } if !app.dcs.AcquireLock(pathManagerLock) { return stateCandidate }