Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ import (

// App is main application structure
type App struct {
dcsDivergeTime time.Time
replFailTime time.Time
lostSince time.Time
dcsDivergeTime time.Time
dcs dcs.DCS
critical atomic.Value
ctx context.Context
dcs dcs.DCS
cache *valkey.SentiCacheNode
config *config.Config
splitTime map[string]time.Time
logger *slog.Logger
nodeFailTime map[string]time.Time
shard *valkey.Shard
cache *valkey.SentiCacheNode
daemonLock *flock.Flock
timings *TimingReporter
mode appMode
aofMode aofMode
state appState
dcsReconnect atomic.Bool
}

func baseContext() context.Context {
Expand Down Expand Up @@ -109,6 +110,20 @@ func (app *App) connectDCS() error {
return nil
}

func (app *App) handleDcsReconnect() bool {
if !app.dcsReconnect.Load() {
return false
}
app.logger.Warn("Performing a scheduled DCS reconnection")
err := app.reconnectDCS()
if err != nil {
app.logger.Error("Scheduled DCS reconnection failed", slog.Any("error", err))
return false
}
app.dcsReconnect.Store(false)
return true
}

func (app *App) reconnectDCS() error {
app.logger.Info("Attempting DCS reconnection after prolonged Lost state")
oldDCS := app.dcs
Expand Down
3 changes: 3 additions & 0 deletions internal/app/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ func (app *App) stateCandidate() appState {
if !app.dcs.IsConnected() {
return stateLost
}
if app.handleDcsReconnect() {
return stateCandidate
}
err := app.shard.UpdateHostsInfo()
if err != nil {
app.logger.Error("Candidate: failed to update host info from DCS", slog.Any("error", err))
Expand Down
7 changes: 7 additions & 0 deletions internal/app/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func (app *App) healthChecker() {
err := app.dcs.SetEphemeral(path, hc)
if err != nil {
app.logger.Error("Failed to set healthcheck status to dcs", slog.Any("error", err))
} else {
var readBack HostState
err = app.dcs.Get(path, &readBack)
if err != nil {
app.logger.Warn("Failed to read back health node from DCS", slog.Any("error", err))
app.dcsReconnect.Store(true)
}
}
} else if !hcCheckTime.IsZero() {
if time.Since(hcCheckTime) < 5*app.config.HealthCheckInterval {
Expand Down
3 changes: 3 additions & 0 deletions internal/app/lost.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
)

func (app *App) stateLost() appState {
if app.handleDcsReconnect() {
return stateCandidate
}
if app.dcs.IsConnected() {
return stateCandidate
}
Expand Down
3 changes: 3 additions & 0 deletions internal/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ func (app *App) stateManager() appState {
if !app.dcs.IsConnected() {
return stateLost
}
if app.handleDcsReconnect() {
return stateCandidate
}
if !app.dcs.AcquireLock(pathManagerLock) {
return stateCandidate
}
Expand Down
Loading