diff --git a/config.toml.example b/config.toml.example index a3b10a15..30dde492 100644 --- a/config.toml.example +++ b/config.toml.example @@ -15,6 +15,7 @@ bearer_token = "secret" [cleanup] pokemon = true # Keep pokemon table is kept nice and short incidents = true # Remove incidents after expiry +station_battles = true # Remove station battles after expiry quests = true # Remove quests after expiry tappables = true # Remove tappables after expiry stats = true # Enable/Disable stats history diff --git a/config/config.go b/config/config.go index b9af097e..c2c0f988 100644 --- a/config/config.go +++ b/config/config.go @@ -53,6 +53,7 @@ type cleanup struct { Pokemon bool `koanf:"pokemon"` Quests bool `koanf:"quests"` Incidents bool `koanf:"incidents"` + StationBattles bool `koanf:"station_battles"` Tappables bool `koanf:"tappables"` Stats bool `koanf:"stats"` StatsDays int `koanf:"stats_days"` diff --git a/config/reader.go b/config/reader.go index fae0a393..09875fa5 100644 --- a/config/reader.go +++ b/config/reader.go @@ -40,12 +40,13 @@ func ReadConfig() (configDefinition, error) { Compress: true, }, Cleanup: cleanup{ - Pokemon: true, - Quests: true, - Incidents: true, - Tappables: true, - StatsDays: 7, - DeviceHours: 24, + Pokemon: true, + Quests: true, + Incidents: true, + StationBattles: true, + Tappables: true, + StatsDays: 7, + DeviceHours: 24, }, Database: database{ MaxPool: 100, diff --git a/decoder/api_fort.go b/decoder/api_fort.go index 8a06b654..372b0215 100644 --- a/decoder/api_fort.go +++ b/decoder/api_fort.go @@ -198,14 +198,34 @@ func isFortDnfMatch(fortType FortType, fortLookup *FortLookup, filter *ApiFortDn } } else if fortLookup.FortType == STATION { if filter.BattleLevel != nil || filter.BattlePokemon != nil { - // Check if battle has expired - if fortLookup.BattleEndTimestamp <= now { - return false + if len(fortLookup.StationBattles) == 0 { + if fortLookup.BattleEndTimestamp <= now { + return false + } + if filter.BattleLevel != nil && !slices.Contains(filter.BattleLevel, fortLookup.BattleLevel) { + return false + } + if filter.BattlePokemon != nil && !matchDnfIdPair(filter.BattlePokemon, fortLookup.BattlePokemonId, fortLookup.BattlePokemonForm) { + return false + } + return true } - if filter.BattleLevel != nil && !slices.Contains(filter.BattleLevel, fortLookup.BattleLevel) { - return false + + matchedBattle := false + for _, battle := range fortLookup.StationBattles { + if battle.BattleEndTimestamp <= now { + continue + } + if filter.BattleLevel != nil && !slices.Contains(filter.BattleLevel, battle.BattleLevel) { + continue + } + if filter.BattlePokemon != nil && !matchDnfIdPair(filter.BattlePokemon, battle.BattlePokemonId, battle.BattlePokemonForm) { + continue + } + matchedBattle = true + break } - if filter.BattlePokemon != nil && !matchDnfIdPair(filter.BattlePokemon, fortLookup.BattlePokemonId, fortLookup.BattlePokemonForm) { + if !matchedBattle { return false } } diff --git a/decoder/api_station.go b/decoder/api_station.go index 88d9c1ee..cab8f128 100644 --- a/decoder/api_station.go +++ b/decoder/api_station.go @@ -1,55 +1,80 @@ package decoder -import "github.com/guregu/null/v6" +import ( + "time" + + "github.com/guregu/null/v6" +) type ApiStationResult struct { - Id string `json:"id"` - Lat float64 `json:"lat"` - Lon float64 `json:"lon"` - Name string `json:"name"` - StartTime int64 `json:"start_time"` - EndTime int64 `json:"end_time"` - IsBattleAvailable bool `json:"is_battle_available"` - Updated int64 `json:"updated"` - BattleLevel null.Int `json:"battle_level"` - BattleStart null.Int `json:"battle_start"` - BattleEnd null.Int `json:"battle_end"` - BattlePokemonId null.Int `json:"battle_pokemon_id"` - BattlePokemonForm null.Int `json:"battle_pokemon_form"` - BattlePokemonCostume null.Int `json:"battle_pokemon_costume"` - BattlePokemonGender null.Int `json:"battle_pokemon_gender"` - BattlePokemonAlignment null.Int `json:"battle_pokemon_alignment"` - BattlePokemonBreadMode null.Int `json:"battle_pokemon_bread_mode"` - BattlePokemonMove1 null.Int `json:"battle_pokemon_move_1"` - BattlePokemonMove2 null.Int `json:"battle_pokemon_move_2"` - TotalStationedPokemon null.Int `json:"total_stationed_pokemon"` - TotalStationedGmax null.Int `json:"total_stationed_gmax"` - StationedPokemon null.String `json:"stationed_pokemon"` + Id string `json:"id"` + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + Name string `json:"name"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + IsBattleAvailable bool `json:"is_battle_available"` + Updated int64 `json:"updated"` + BattleLevel null.Int `json:"battle_level"` + BattleStart null.Int `json:"battle_start"` + BattleEnd null.Int `json:"battle_end"` + BattlePokemonId null.Int `json:"battle_pokemon_id"` + BattlePokemonForm null.Int `json:"battle_pokemon_form"` + BattlePokemonCostume null.Int `json:"battle_pokemon_costume"` + BattlePokemonGender null.Int `json:"battle_pokemon_gender"` + BattlePokemonAlignment null.Int `json:"battle_pokemon_alignment"` + BattlePokemonBreadMode null.Int `json:"battle_pokemon_bread_mode"` + BattlePokemonMove1 null.Int `json:"battle_pokemon_move_1"` + BattlePokemonMove2 null.Int `json:"battle_pokemon_move_2"` + TotalStationedPokemon null.Int `json:"total_stationed_pokemon"` + TotalStationedGmax null.Int `json:"total_stationed_gmax"` + StationedPokemon null.String `json:"stationed_pokemon"` + Battles []ApiStationBattle `json:"battles,omitempty"` } func BuildStationResult(station *Station) ApiStationResult { - return ApiStationResult{ - Id: station.Id, - Lat: station.Lat, - Lon: station.Lon, - Name: station.Name, - StartTime: station.StartTime, - EndTime: station.EndTime, - IsBattleAvailable: station.IsBattleAvailable, - Updated: station.Updated, - BattleLevel: station.BattleLevel, - BattleStart: station.BattleStart, - BattleEnd: station.BattleEnd, - BattlePokemonId: station.BattlePokemonId, - BattlePokemonForm: station.BattlePokemonForm, - BattlePokemonCostume: station.BattlePokemonCostume, - BattlePokemonGender: station.BattlePokemonGender, - BattlePokemonAlignment: station.BattlePokemonAlignment, - BattlePokemonBreadMode: station.BattlePokemonBreadMode, - BattlePokemonMove1: station.BattlePokemonMove1, - BattlePokemonMove2: station.BattlePokemonMove2, - TotalStationedPokemon: station.TotalStationedPokemon, - TotalStationedGmax: station.TotalStationedGmax, - StationedPokemon: station.StationedPokemon, + now := time.Now().Unix() + snapshot := collectStationBattleSnapshot(station, now) + hasBattleState := hasHydratedStationBattles(station.Id) + + result := ApiStationResult{ + Id: station.Id, + Lat: station.Lat, + Lon: station.Lon, + Name: station.Name, + StartTime: station.StartTime, + EndTime: station.EndTime, + IsBattleAvailable: station.IsBattleAvailable, + Updated: station.Updated, + TotalStationedPokemon: station.TotalStationedPokemon, + TotalStationedGmax: station.TotalStationedGmax, + StationedPokemon: station.StationedPokemon, + Battles: buildApiStationBattlesFromSlice(snapshot.Battles), + } + if snapshot.Canonical != nil { + result.BattleLevel = null.IntFrom(int64(snapshot.Canonical.BattleLevel)) + result.BattleStart = null.IntFrom(snapshot.Canonical.BattleStart) + result.BattleEnd = null.IntFrom(snapshot.Canonical.BattleEnd) + result.BattlePokemonId = snapshot.Canonical.BattlePokemonId + result.BattlePokemonForm = snapshot.Canonical.BattlePokemonForm + result.BattlePokemonCostume = snapshot.Canonical.BattlePokemonCostume + result.BattlePokemonGender = snapshot.Canonical.BattlePokemonGender + result.BattlePokemonAlignment = snapshot.Canonical.BattlePokemonAlignment + result.BattlePokemonBreadMode = snapshot.Canonical.BattlePokemonBreadMode + result.BattlePokemonMove1 = snapshot.Canonical.BattlePokemonMove1 + result.BattlePokemonMove2 = snapshot.Canonical.BattlePokemonMove2 + } else if !hasBattleState { + result.BattleLevel = station.BattleLevel + result.BattleStart = station.BattleStart + result.BattleEnd = station.BattleEnd + result.BattlePokemonId = station.BattlePokemonId + result.BattlePokemonForm = station.BattlePokemonForm + result.BattlePokemonCostume = station.BattlePokemonCostume + result.BattlePokemonGender = station.BattlePokemonGender + result.BattlePokemonAlignment = station.BattlePokemonAlignment + result.BattlePokemonBreadMode = station.BattlePokemonBreadMode + result.BattlePokemonMove1 = station.BattlePokemonMove1 + result.BattlePokemonMove2 = station.BattlePokemonMove2 } + return result } diff --git a/decoder/fortRtree.go b/decoder/fortRtree.go index 7fb0f44b..dc6fe54d 100644 --- a/decoder/fortRtree.go +++ b/decoder/fortRtree.go @@ -3,6 +3,7 @@ package decoder import ( "encoding/json" "sync" + "time" "github.com/guregu/null/v6" "github.com/puzpuzpuz/xsync/v3" @@ -57,6 +58,7 @@ type FortLookup struct { BattleLevel int8 BattlePokemonId int16 BattlePokemonForm int16 + StationBattles []FortLookupStationBattle } var fortLookupCache *xsync.MapOf[string, FortLookup] @@ -202,14 +204,31 @@ func updateGymLookup(gym *Gym) { } func updateStationLookup(station *Station) { + updateStationLookupFromSnapshot(station, collectStationBattleSnapshot(station, time.Now().Unix())) +} + +func updateStationLookupFromSnapshot(station *Station, snapshot stationBattleSnapshot) { + battles := buildFortLookupStationBattlesFromSlice(snapshot.Battles) + canonical := snapshot.Canonical + battleEndTimestamp := int64(0) + battleLevel := int8(0) + battlePokemonId := int16(0) + battlePokemonForm := int16(0) + if canonical != nil { + battleEndTimestamp = canonical.BattleEnd + battleLevel = int8(canonical.BattleLevel) + battlePokemonId = int16(canonical.BattlePokemonId.ValueOrZero()) + battlePokemonForm = int16(canonical.BattlePokemonForm.ValueOrZero()) + } fortLookupCache.Store(station.Id, FortLookup{ FortType: STATION, Lat: station.Lat, Lon: station.Lon, - BattleEndTimestamp: station.BattleEnd.ValueOrZero(), - BattleLevel: int8(station.BattleLevel.ValueOrZero()), - BattlePokemonId: int16(station.BattlePokemonId.ValueOrZero()), - BattlePokemonForm: int16(station.BattlePokemonForm.ValueOrZero()), + BattleEndTimestamp: battleEndTimestamp, + BattleLevel: battleLevel, + BattlePokemonId: battlePokemonId, + BattlePokemonForm: battlePokemonForm, + StationBattles: battles, }) } diff --git a/decoder/gmo_decode.go b/decoder/gmo_decode.go index 97ce0dd9..3fe8f9b9 100644 --- a/decoder/gmo_decode.go +++ b/decoder/gmo_decode.go @@ -122,6 +122,7 @@ func UpdateStationBatch(ctx context.Context, db db.DbDetails, scanParameters Sca continue } station.updateFromStationProto(stationProto.Data, stationProto.Cell) + syncStationBattlesFromProto(station, stationProto.Data.BattleDetails) saveStationRecord(ctx, db, station) unlock() } diff --git a/decoder/main.go b/decoder/main.go index 9ba127d1..393e3b04 100644 --- a/decoder/main.go +++ b/decoder/main.go @@ -127,12 +127,13 @@ func initDataCache() { TTL: fortCacheTTL, KeyToShard: StringKeyToShard, }) - if config.Config.FortInMemory { - stationCache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, *Station]) { + stationCache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, *Station]) { + clearStationBattleCaches(item.Key()) + if config.Config.FortInMemory { s := item.Value() evictFortFromTree(s.Id, s.Lat, s.Lon) - }) - } + } + }) tappableCache = ttlcache.New[uint64, *Tappable]( ttlcache.WithTTL[uint64, *Tappable](60 * time.Minute), @@ -170,6 +171,7 @@ func initDataCache() { }) initPokemonRtree() initFortRtree() + initStationBattleCache() incidentCache = ttlcache.New[string, *Incident]( ttlcache.WithTTL[string, *Incident](60 * time.Minute), diff --git a/decoder/preload.go b/decoder/preload.go index b14fe7f9..82317b84 100644 --- a/decoder/preload.go +++ b/decoder/preload.go @@ -11,13 +11,13 @@ import ( "golbat/db" ) -// Preload loads forts, stations, active incidents, and recent spawnpoints from DB into cache. +// Preload loads forts, stations, active station battles, active incidents, and recent spawnpoints from DB into cache. // If populateRtree is true, also builds the rtree index for forts. func Preload(dbDetails db.DbDetails, populateRtree bool) { startTime := time.Now() var wg sync.WaitGroup - var pokestopCount, gymCount, stationCount, incidentCount, spawnpointCount int32 + var pokestopCount, gymCount, stationCount, stationBattleCount, incidentCount, spawnpointCount int32 // Phase 1: Load forts and spawnpoints in parallel. // Forts must be loaded before incidents so that the fort lookup cache @@ -41,11 +41,20 @@ func Preload(dbDetails db.DbDetails, populateRtree bool) { }() wg.Wait() - // Phase 2: Load incidents (needs pokestop lookup entries to exist) - incidentCount = preloadIncidents(dbDetails, populateRtree) + // Phase 2: Load child state that depends on forts/stations already being present. + wg.Add(2) + go func() { + defer wg.Done() + stationBattleCount = preloadStationBattles(dbDetails, populateRtree) + }() + go func() { + defer wg.Done() + incidentCount = preloadIncidents(dbDetails, populateRtree) + }() + wg.Wait() - log.Infof("Preload: loaded %d pokestops, %d gyms, %d stations, %d incidents, %d spawnpoints in %v (rtree=%v)", - pokestopCount, gymCount, stationCount, incidentCount, spawnpointCount, time.Since(startTime), populateRtree) + log.Infof("Preload: loaded %d pokestops, %d gyms, %d stations, %d station battles, %d incidents, %d spawnpoints in %v (rtree=%v)", + pokestopCount, gymCount, stationCount, stationBattleCount, incidentCount, spawnpointCount, time.Since(startTime), populateRtree) } // PreloadForts loads all forts from DB into cache. diff --git a/decoder/station.go b/decoder/station.go index 02fb9ed3..00119c6f 100644 --- a/decoder/station.go +++ b/decoder/station.go @@ -2,6 +2,7 @@ package decoder import ( "fmt" + "time" "github.com/guregu/null/v6" ) @@ -57,12 +58,17 @@ type Station struct { // StationOldValues holds old field values for webhook comparison type StationOldValues struct { EndTime int64 + IsBattleAvailable bool + HasCanonicalBattle bool + CanonicalBattleSeed int64 + BattleProjection *StationBattleData BattleEnd null.Int BattlePokemonId null.Int BattlePokemonForm null.Int BattlePokemonCostume null.Int BattlePokemonGender null.Int BattlePokemonBreadMode null.Int + BattleListSignature string } // IsDirty returns true if any field has been modified @@ -93,7 +99,13 @@ func (station *Station) Unlock() { // snapshotOldValues saves current values for webhook comparison // Call this after loading from cache/DB but before modifications func (station *Station) snapshotOldValues() { + now := time.Now().Unix() + snapshot := collectStationBattleSnapshot(station, now) station.oldValues = StationOldValues{ + IsBattleAvailable: station.IsBattleAvailable, + HasCanonicalBattle: snapshot.Canonical != nil, + CanonicalBattleSeed: canonicalBattleSeed(snapshot.Canonical), + BattleProjection: stationBattleFromStationProjection(station), EndTime: station.EndTime, BattleEnd: station.BattleEnd, BattlePokemonId: station.BattlePokemonId, @@ -101,6 +113,7 @@ func (station *Station) snapshotOldValues() { BattlePokemonCostume: station.BattlePokemonCostume, BattlePokemonGender: station.BattlePokemonGender, BattlePokemonBreadMode: station.BattlePokemonBreadMode, + BattleListSignature: snapshot.Signature, } } diff --git a/decoder/station_battle.go b/decoder/station_battle.go new file mode 100644 index 00000000..16a67a67 --- /dev/null +++ b/decoder/station_battle.go @@ -0,0 +1,744 @@ +package decoder + +import ( + "context" + "slices" + "strconv" + "strings" + "time" + + "github.com/guregu/null/v6" + "github.com/jellydator/ttlcache/v3" + "github.com/puzpuzpuz/xsync/v3" + log "github.com/sirupsen/logrus" + + "golbat/db" + "golbat/pogo" +) + +type StationBattleData struct { + BreadBattleSeed int64 `db:"bread_battle_seed"` + StationId string `db:"station_id"` + BattleLevel int16 `db:"battle_level"` + BattleStart int64 `db:"battle_start"` + BattleEnd int64 `db:"battle_end"` + BattlePokemonId null.Int `db:"battle_pokemon_id"` + BattlePokemonForm null.Int `db:"battle_pokemon_form"` + BattlePokemonCostume null.Int `db:"battle_pokemon_costume"` + BattlePokemonGender null.Int `db:"battle_pokemon_gender"` + BattlePokemonAlignment null.Int `db:"battle_pokemon_alignment"` + BattlePokemonBreadMode null.Int `db:"battle_pokemon_bread_mode"` + BattlePokemonMove1 null.Int `db:"battle_pokemon_move_1"` + BattlePokemonMove2 null.Int `db:"battle_pokemon_move_2"` + BattlePokemonStamina null.Int `db:"battle_pokemon_stamina"` + BattlePokemonCpMultiplier null.Float `db:"battle_pokemon_cp_multiplier"` + Updated int64 `db:"updated"` +} + +type ApiStationBattle struct { + BreadBattleSeed int64 `json:"bread_battle_seed,omitempty"` + BattleLevel int16 `json:"battle_level"` + BattleStart int64 `json:"battle_start"` + BattleEnd int64 `json:"battle_end"` + BattlePokemonId null.Int `json:"battle_pokemon_id"` + BattlePokemonForm null.Int `json:"battle_pokemon_form"` + BattlePokemonCostume null.Int `json:"battle_pokemon_costume"` + BattlePokemonGender null.Int `json:"battle_pokemon_gender"` + BattlePokemonAlignment null.Int `json:"battle_pokemon_alignment"` + BattlePokemonBreadMode null.Int `json:"battle_pokemon_bread_mode"` + BattlePokemonMove1 null.Int `json:"battle_pokemon_move_1"` + BattlePokemonMove2 null.Int `json:"battle_pokemon_move_2"` + BattlePokemonStamina null.Int `json:"battle_pokemon_stamina"` + BattlePokemonCpMultiplier null.Float `json:"battle_pokemon_cp_multiplier"` +} + +type StationBattleWebhook struct { + BreadBattleSeed int64 `json:"bread_battle_seed,omitempty"` + BattleLevel int16 `json:"battle_level"` + BattleStart int64 `json:"battle_start"` + BattleEnd int64 `json:"battle_end"` + BattlePokemonId null.Int `json:"battle_pokemon_id"` + BattlePokemonForm null.Int `json:"battle_pokemon_form"` + BattlePokemonCostume null.Int `json:"battle_pokemon_costume"` + BattlePokemonGender null.Int `json:"battle_pokemon_gender"` + BattlePokemonAlignment null.Int `json:"battle_pokemon_alignment"` + BattlePokemonBreadMode null.Int `json:"battle_pokemon_bread_mode"` + BattlePokemonMove1 null.Int `json:"battle_pokemon_move_1"` + BattlePokemonMove2 null.Int `json:"battle_pokemon_move_2"` + BattlePokemonStamina null.Int `json:"battle_pokemon_stamina"` + BattlePokemonCpMultiplier null.Float `json:"battle_pokemon_cp_multiplier"` +} + +type FortLookupStationBattle struct { + BattleEndTimestamp int64 + BattleLevel int8 + BattlePokemonId int16 + BattlePokemonForm int16 +} + +type StationBattleWrite struct { + StationId string + Battles []StationBattleData +} + +type stationBattleSnapshot struct { + Battles []StationBattleData + Canonical *StationBattleData + Signature string +} + +const stationBattleSelectColumns = `bread_battle_seed, station_id, battle_level, battle_start, battle_end, + battle_pokemon_id, battle_pokemon_form, battle_pokemon_costume, battle_pokemon_gender, + battle_pokemon_alignment, battle_pokemon_bread_mode, battle_pokemon_move_1, battle_pokemon_move_2, + battle_pokemon_stamina, battle_pokemon_cp_multiplier, updated` + +var stationBattleCache *xsync.MapOf[string, []StationBattleData] +var stationBattleHydratedCache *xsync.MapOf[string, struct{}] + +func initStationBattleCache() { + stationBattleCache = xsync.NewMapOf[string, []StationBattleData]() + stationBattleHydratedCache = xsync.NewMapOf[string, struct{}]() +} + +func markStationBattlesHydrated(stationId string) { + if stationId == "" { + return + } + stationBattleHydratedCache.Store(stationId, struct{}{}) +} + +func clearStationBattleCaches(stationId string) { + if stationId == "" { + return + } + stationBattleCache.Delete(stationId) + stationBattleHydratedCache.Delete(stationId) +} + +func hasHydratedStationBattles(stationId string) bool { + if stationId == "" { + return false + } + _, ok := stationBattleHydratedCache.Load(stationId) + return ok +} + +func syncStationBattlesFromProto(station *Station, battleDetail *pogo.BreadBattleDetailProto) { + now := time.Now().Unix() + if battle := stationBattleFromProto(station.Id, battleDetail, now); battle != nil { + upsertCachedStationBattle(*battle, now) + } + markStationBattlesHydrated(station.Id) + + snapshot := collectStationBattleSnapshot(station, now) + applyStationBattleProjection(station, snapshot.Canonical) +} + +func stationBattleFromProto(stationId string, battleDetail *pogo.BreadBattleDetailProto, updated int64) *StationBattleData { + if stationId == "" || battleDetail == nil { + return nil + } + seed := battleDetail.GetBreadBattleSeed() + battle := &StationBattleData{ + BreadBattleSeed: seed, + StationId: stationId, + BattleLevel: int16(battleDetail.GetBattleLevel()), + BattleStart: int64(battleDetail.GetBattleWindowStartMs() / 1000), + BattleEnd: int64(battleDetail.GetBattleWindowEndMs() / 1000), + Updated: updated, + } + if pokemon := battleDetail.GetBattlePokemon(); pokemon != nil { + battle.BattlePokemonId = null.IntFrom(int64(pokemon.GetPokemonId())) + battle.BattlePokemonMove1 = null.IntFrom(int64(pokemon.GetMove1())) + battle.BattlePokemonMove2 = null.IntFrom(int64(pokemon.GetMove2())) + battle.BattlePokemonForm = null.IntFrom(int64(pokemon.GetPokemonDisplay().GetForm())) + battle.BattlePokemonCostume = null.IntFrom(int64(pokemon.GetPokemonDisplay().GetCostume())) + battle.BattlePokemonGender = null.IntFrom(int64(pokemon.GetPokemonDisplay().GetGender())) + battle.BattlePokemonAlignment = null.IntFrom(int64(pokemon.GetPokemonDisplay().GetAlignment())) + battle.BattlePokemonBreadMode = null.IntFrom(int64(pokemon.GetPokemonDisplay().GetBreadModeEnum())) + battle.BattlePokemonStamina = null.IntFrom(int64(pokemon.GetStamina())) + battle.BattlePokemonCpMultiplier = null.FloatFrom(float64(pokemon.GetCpMultiplier())) + } + return battle +} + +func stationBattleFromStationProjection(station *Station) *StationBattleData { + if station == nil || !station.BattleEnd.Valid { + return nil + } + return &StationBattleData{ + StationId: station.Id, + BattleLevel: int16(station.BattleLevel.ValueOrZero()), + BattleStart: station.BattleStart.ValueOrZero(), + BattleEnd: station.BattleEnd.ValueOrZero(), + BattlePokemonId: station.BattlePokemonId, + BattlePokemonForm: station.BattlePokemonForm, + BattlePokemonCostume: station.BattlePokemonCostume, + BattlePokemonGender: station.BattlePokemonGender, + BattlePokemonAlignment: station.BattlePokemonAlignment, + BattlePokemonBreadMode: station.BattlePokemonBreadMode, + BattlePokemonMove1: station.BattlePokemonMove1, + BattlePokemonMove2: station.BattlePokemonMove2, + BattlePokemonStamina: station.BattlePokemonStamina, + BattlePokemonCpMultiplier: station.BattlePokemonCpMultiplier, + Updated: station.Updated, + } +} + +func cloneStationBattles(battles []StationBattleData) []StationBattleData { + if len(battles) == 0 { + return nil + } + return append([]StationBattleData(nil), battles...) +} + +func sortStationBattlesByEnd(battles []StationBattleData) { + slices.SortFunc(battles, func(a, b StationBattleData) int { + if a.BattleEnd != b.BattleEnd { + if a.BattleEnd > b.BattleEnd { + return -1 + } + return 1 + } + if a.BattleStart != b.BattleStart { + if a.BattleStart > b.BattleStart { + return -1 + } + return 1 + } + switch { + case a.BreadBattleSeed > b.BreadBattleSeed: + return -1 + case a.BreadBattleSeed < b.BreadBattleSeed: + return 1 + default: + return 0 + } + }) +} + +func stationBattleIsActive(battle StationBattleData, now int64) bool { + if battle.BattleEnd <= now { + return false + } + if battle.BattleStart == 0 { + return true + } + return battle.BattleStart <= now +} + +func activeStationBattlesFromSlice(battles []StationBattleData, now int64) []StationBattleData { + if len(battles) == 0 { + return nil + } + active := make([]StationBattleData, 0, len(battles)) + for _, battle := range battles { + if stationBattleIsActive(battle, now) { + active = append(active, battle) + } + } + return active +} + +func nonExpiredStationBattlesFromSlice(battles []StationBattleData, now int64) []StationBattleData { + if len(battles) == 0 { + return nil + } + current := make([]StationBattleData, 0, len(battles)) + for _, battle := range battles { + if battle.BattleEnd > now { + current = append(current, battle) + } + } + return current +} + +func stationBattlesEqual(a []StationBattleData, b []StationBattleData) bool { + if len(a) != len(b) { + return false + } + for i := range a { + left := a[i] + right := b[i] + left.Updated = 0 + right.Updated = 0 + if left != right { + return false + } + } + return true +} + +func upsertCachedStationBattle(battle StationBattleData, now int64) bool { + existing, _ := stationBattleCache.Load(battle.StationId) + next := pruneObsoleteStationBattles(existing, battle, now) + sortStationBattlesByEnd(next) + if stationBattlesEqual(existing, next) { + return false + } + if len(next) == 0 { + stationBattleCache.Delete(battle.StationId) + } else { + stationBattleCache.Store(battle.StationId, next) + } + return true +} + +func pruneObsoleteStationBattles(existing []StationBattleData, battle StationBattleData, now int64) []StationBattleData { + next := make([]StationBattleData, 0, len(existing)+1) + if battle.BattleEnd > now { + next = append(next, battle) + } + for _, cached := range existing { + if cached.BreadBattleSeed == battle.BreadBattleSeed || cached.BattleEnd <= now || cached.BattleEnd <= battle.BattleEnd { + continue + } + next = append(next, cached) + } + return next +} + +func getKnownStationBattles(stationId string, station *Station, now int64) []StationBattleData { + if stationId != "" { + if cached, ok := stationBattleCache.Load(stationId); ok { + current := nonExpiredStationBattlesFromSlice(cached, now) + if len(current) > 0 { + return current + } + stationBattleCache.Delete(stationId) + if hasHydratedStationBattles(stationId) { + return nil + } + } + if hasHydratedStationBattles(stationId) { + return nil + } + } + if fallback := stationBattleFromStationProjection(station); fallback != nil && fallback.BattleEnd > now { + return []StationBattleData{*fallback} + } + return nil +} + +func collectStationBattleSnapshot(station *Station, now int64) stationBattleSnapshot { + battles := getKnownStationBattles(station.Id, station, now) + return stationBattleSnapshot{ + Battles: battles, + Canonical: canonicalStationBattleFromSlice(station, battles, now), + Signature: stationBattleSignatureFromSlice(battles), + } +} + +func getActiveStationBattles(stationId string, station *Station, now int64) []StationBattleData { + return activeStationBattlesFromSlice(getKnownStationBattles(stationId, station, now), now) +} + +func stationBattleMatchesProjection(battle StationBattleData, projection *StationBattleData) bool { + if projection == nil { + return false + } + return battle.BattleLevel == projection.BattleLevel && + battle.BattleStart == projection.BattleStart && + battle.BattleEnd == projection.BattleEnd && + battle.BattlePokemonId == projection.BattlePokemonId && + battle.BattlePokemonForm == projection.BattlePokemonForm +} + +func canonicalStationBattleFromSlice(station *Station, battles []StationBattleData, now int64) *StationBattleData { + if len(battles) == 0 { + return nil + } + projection := stationBattleFromStationProjection(station) + if projection != nil && stationBattleIsActive(*projection, now) { + for _, battle := range battles { + if stationBattleIsActive(battle, now) && stationBattleMatchesProjection(battle, projection) { + current := battle + return ¤t + } + } + } + for _, battle := range battles { + if stationBattleIsActive(battle, now) { + current := battle + return ¤t + } + } + if projection != nil { + for _, battle := range battles { + if stationBattleMatchesProjection(battle, projection) { + current := battle + return ¤t + } + } + } + battle := battles[0] + return &battle +} + +func canonicalBattleSeed(battle *StationBattleData) int64 { + if battle == nil { + return 0 + } + return battle.BreadBattleSeed +} + +func clearStationBattleProjection(station *Station) { + station.SetBattleLevel(null.Int{}) + station.SetBattleStart(null.Int{}) + station.SetBattleEnd(null.Int{}) + station.SetBattlePokemonId(null.Int{}) + station.SetBattlePokemonForm(null.Int{}) + station.SetBattlePokemonCostume(null.Int{}) + station.SetBattlePokemonGender(null.Int{}) + station.SetBattlePokemonAlignment(null.Int{}) + station.SetBattlePokemonBreadMode(null.Int{}) + station.SetBattlePokemonMove1(null.Int{}) + station.SetBattlePokemonMove2(null.Int{}) + station.SetBattlePokemonStamina(null.Int{}) + station.SetBattlePokemonCpMultiplier(null.Float{}) +} + +func restoreStationBattleProjectionFromOldValues(station *Station) { + station.SetIsBattleAvailable(station.oldValues.IsBattleAvailable) + if station.oldValues.BattleProjection == nil { + clearStationBattleProjection(station) + return + } + battle := *station.oldValues.BattleProjection + applyStationBattleProjection(station, &battle) +} + +func applyStationBattleProjection(station *Station, battle *StationBattleData) { + if battle == nil { + clearStationBattleProjection(station) + return + } + station.SetBattleLevel(null.IntFrom(int64(battle.BattleLevel))) + station.SetBattleStart(null.IntFrom(battle.BattleStart)) + station.SetBattleEnd(null.IntFrom(battle.BattleEnd)) + station.SetBattlePokemonId(battle.BattlePokemonId) + station.SetBattlePokemonForm(battle.BattlePokemonForm) + station.SetBattlePokemonCostume(battle.BattlePokemonCostume) + station.SetBattlePokemonGender(battle.BattlePokemonGender) + station.SetBattlePokemonAlignment(battle.BattlePokemonAlignment) + station.SetBattlePokemonBreadMode(battle.BattlePokemonBreadMode) + station.SetBattlePokemonMove1(battle.BattlePokemonMove1) + station.SetBattlePokemonMove2(battle.BattlePokemonMove2) + station.SetBattlePokemonStamina(battle.BattlePokemonStamina) + station.SetBattlePokemonCpMultiplier(battle.BattlePokemonCpMultiplier) +} + +func stationBattleSignature(station *Station, now int64) string { + return collectStationBattleSnapshot(station, now).Signature +} + +func stationBattleSignatureFromSlice(battles []StationBattleData) string { + if len(battles) == 0 { + return "" + } + var builder strings.Builder + for _, battle := range battles { + builder.WriteString(strconv.FormatInt(battle.BreadBattleSeed, 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(int64(battle.BattleLevel), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattleStart, 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattleEnd, 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonId.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonForm.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonCostume.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonGender.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonAlignment.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonBreadMode.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonMove1.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatBool(battle.BattlePokemonMove2.Valid)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatBool(battle.BattlePokemonCpMultiplier.Valid)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatInt(battle.BattlePokemonMove2.ValueOrZero(), 10)) + builder.WriteByte(':') + builder.WriteString(strconv.FormatFloat(battle.BattlePokemonCpMultiplier.ValueOrZero(), 'g', -1, 64)) + builder.WriteByte(';') + } + return builder.String() +} + +func buildApiStationBattlesFromSlice(battles []StationBattleData) []ApiStationBattle { + if len(battles) == 0 { + return nil + } + result := make([]ApiStationBattle, 0, len(battles)) + for _, battle := range battles { + result = append(result, ApiStationBattle{ + BreadBattleSeed: battle.BreadBattleSeed, + BattleLevel: battle.BattleLevel, + BattleStart: battle.BattleStart, + BattleEnd: battle.BattleEnd, + BattlePokemonId: battle.BattlePokemonId, + BattlePokemonForm: battle.BattlePokemonForm, + BattlePokemonCostume: battle.BattlePokemonCostume, + BattlePokemonGender: battle.BattlePokemonGender, + BattlePokemonAlignment: battle.BattlePokemonAlignment, + BattlePokemonBreadMode: battle.BattlePokemonBreadMode, + BattlePokemonMove1: battle.BattlePokemonMove1, + BattlePokemonMove2: battle.BattlePokemonMove2, + BattlePokemonStamina: battle.BattlePokemonStamina, + BattlePokemonCpMultiplier: battle.BattlePokemonCpMultiplier, + }) + } + return result +} + +func buildApiStationBattles(station *Station, now int64) []ApiStationBattle { + return buildApiStationBattlesFromSlice(getKnownStationBattles(station.Id, station, now)) +} + +func buildStationWebhookBattlesFromSlice(battles []StationBattleData) []StationBattleWebhook { + if len(battles) == 0 { + return nil + } + result := make([]StationBattleWebhook, 0, len(battles)) + for _, battle := range battles { + result = append(result, StationBattleWebhook{ + BreadBattleSeed: battle.BreadBattleSeed, + BattleLevel: battle.BattleLevel, + BattleStart: battle.BattleStart, + BattleEnd: battle.BattleEnd, + BattlePokemonId: battle.BattlePokemonId, + BattlePokemonForm: battle.BattlePokemonForm, + BattlePokemonCostume: battle.BattlePokemonCostume, + BattlePokemonGender: battle.BattlePokemonGender, + BattlePokemonAlignment: battle.BattlePokemonAlignment, + BattlePokemonBreadMode: battle.BattlePokemonBreadMode, + BattlePokemonMove1: battle.BattlePokemonMove1, + BattlePokemonMove2: battle.BattlePokemonMove2, + BattlePokemonStamina: battle.BattlePokemonStamina, + BattlePokemonCpMultiplier: battle.BattlePokemonCpMultiplier, + }) + } + return result +} + +func buildStationWebhookBattles(station *Station, now int64) []StationBattleWebhook { + return buildStationWebhookBattlesFromSlice(getKnownStationBattles(station.Id, station, now)) +} + +func buildFortLookupStationBattlesFromSlice(battles []StationBattleData) []FortLookupStationBattle { + if len(battles) == 0 { + return nil + } + result := make([]FortLookupStationBattle, 0, len(battles)) + for _, battle := range battles { + result = append(result, FortLookupStationBattle{ + BattleEndTimestamp: battle.BattleEnd, + BattleLevel: int8(battle.BattleLevel), + BattlePokemonId: int16(battle.BattlePokemonId.ValueOrZero()), + BattlePokemonForm: int16(battle.BattlePokemonForm.ValueOrZero()), + }) + } + return result +} + +func buildFortLookupStationBattles(station *Station, now int64) []FortLookupStationBattle { + return buildFortLookupStationBattlesFromSlice(getKnownStationBattles(station.Id, station, now)) +} + +func stationBattleWriteFromSlice(stationId string, battles []StationBattleData) StationBattleWrite { + return StationBattleWrite{ + StationId: stationId, + Battles: cloneStationBattles(battles), + } +} + +func storeStationBattleSnapshot(ctx context.Context, dbDetails db.DbDetails, snapshot StationBattleWrite) error { + tx, err := dbDetails.GeneralDb.BeginTxx(ctx, nil) + statsCollector.IncDbQuery("begin station_battle", err) + if err != nil { + return err + } + + if len(snapshot.Battles) > 0 { + if _, err = tx.NamedExecContext(ctx, ` + INSERT INTO station_battle ( + bread_battle_seed, station_id, battle_level, battle_start, battle_end, + battle_pokemon_id, battle_pokemon_form, battle_pokemon_costume, battle_pokemon_gender, + battle_pokemon_alignment, battle_pokemon_bread_mode, battle_pokemon_move_1, battle_pokemon_move_2, + battle_pokemon_stamina, battle_pokemon_cp_multiplier, updated + ) VALUES ( + :bread_battle_seed, :station_id, :battle_level, :battle_start, :battle_end, + :battle_pokemon_id, :battle_pokemon_form, :battle_pokemon_costume, :battle_pokemon_gender, + :battle_pokemon_alignment, :battle_pokemon_bread_mode, :battle_pokemon_move_1, :battle_pokemon_move_2, + :battle_pokemon_stamina, :battle_pokemon_cp_multiplier, :updated + ) + ON DUPLICATE KEY UPDATE + station_id = VALUES(station_id), + battle_level = VALUES(battle_level), + battle_start = VALUES(battle_start), + battle_end = VALUES(battle_end), + battle_pokemon_id = VALUES(battle_pokemon_id), + battle_pokemon_form = VALUES(battle_pokemon_form), + battle_pokemon_costume = VALUES(battle_pokemon_costume), + battle_pokemon_gender = VALUES(battle_pokemon_gender), + battle_pokemon_alignment = VALUES(battle_pokemon_alignment), + battle_pokemon_bread_mode = VALUES(battle_pokemon_bread_mode), + battle_pokemon_move_1 = VALUES(battle_pokemon_move_1), + battle_pokemon_move_2 = VALUES(battle_pokemon_move_2), + battle_pokemon_stamina = VALUES(battle_pokemon_stamina), + battle_pokemon_cp_multiplier = VALUES(battle_pokemon_cp_multiplier), + updated = VALUES(updated) + `, snapshot.Battles); err != nil { + _ = tx.Rollback() + statsCollector.IncDbQuery("upsert station_battle", err) + return err + } + statsCollector.IncDbQuery("upsert station_battle", nil) + } + + deleteQuery := "DELETE FROM station_battle WHERE station_id = ?" + deleteArgs := []any{snapshot.StationId} + if len(snapshot.Battles) > 0 { + deleteQuery += " AND bread_battle_seed NOT IN (" + for i, battle := range snapshot.Battles { + if i > 0 { + deleteQuery += "," + } + deleteQuery += "?" + deleteArgs = append(deleteArgs, battle.BreadBattleSeed) + } + deleteQuery += ")" + } + if _, err = tx.ExecContext(ctx, deleteQuery, deleteArgs...); err != nil { + _ = tx.Rollback() + statsCollector.IncDbQuery("delete obsolete station_battle", err) + return err + } + statsCollector.IncDbQuery("delete obsolete station_battle", nil) + + err = tx.Commit() + statsCollector.IncDbQuery("commit station_battle", err) + return err +} + +func flushStationBattleBatch(ctx context.Context, dbDetails db.DbDetails, snapshots []StationBattleWrite) error { + var firstErr error + for _, snapshot := range snapshots { + if err := storeStationBattleSnapshot(ctx, dbDetails, snapshot); err != nil { + log.Errorf("flush station_battle %s: %v", snapshot.StationId, err) + if firstErr == nil { + firstErr = err + } + } + } + return firstErr +} + +func loadStationBattlesForStation(ctx context.Context, dbDetails db.DbDetails, stationId string, now int64) ([]StationBattleData, error) { + var battles []StationBattleData + err := dbDetails.GeneralDb.SelectContext(ctx, &battles, ` + SELECT `+stationBattleSelectColumns+` + FROM station_battle + WHERE station_id = ? AND battle_end > ? + ORDER BY battle_end DESC, battle_start DESC, bread_battle_seed DESC + `, stationId, now) + statsCollector.IncDbQuery("select station_battle station", err) + if err != nil { + return nil, err + } + return battles, nil +} + +func hydrateStationBattlesForStation(ctx context.Context, dbDetails db.DbDetails, station *Station, now int64) error { + if station == nil || station.Id == "" { + return nil + } + battles, err := loadStationBattlesForStation(ctx, dbDetails, station.Id, now) + if err != nil { + return err + } + if len(battles) == 0 { + stationBattleCache.Delete(station.Id) + markStationBattlesHydrated(station.Id) + return nil + } + stationBattleCache.Store(station.Id, battles) + markStationBattlesHydrated(station.Id) + return nil +} + +func cachePreloadedStationBattles(stationId string, battles []StationBattleData) bool { + if stationId == "" || len(battles) == 0 { + return false + } + sortStationBattlesByEnd(battles) + stationBattleCache.Store(stationId, battles) + markStationBattlesHydrated(stationId) + return true +} + +func markPreloadedStationsHydrated(populateRtree bool) { + stationCache.Range(func(item *ttlcache.Item[string, *Station]) bool { + stationId := item.Key() + markStationBattlesHydrated(stationId) + if populateRtree { + station := item.Value() + station.Lock("preloadStationBattles") + fortRtreeUpdateStationOnSave(station) + station.Unlock() + } + return true + }) +} + +func preloadStationBattles(dbDetails db.DbDetails, populateRtree bool) int32 { + now := time.Now().Unix() + query := "SELECT " + stationBattleSelectColumns + " FROM station_battle WHERE battle_end > ? " + + "ORDER BY station_id, battle_end DESC, battle_start DESC, bread_battle_seed DESC" + rows, err := dbDetails.GeneralDb.Queryx(query, now) + statsCollector.IncDbQuery("select station_battle active", err) + if err != nil { + log.Errorf("Preload: failed to query station battles - %s", err) + return 0 + } + defer rows.Close() + + count := int32(0) + affected := make([]string, 0) + currentStationId := "" + currentBattles := make([]StationBattleData, 0) + flushCurrent := func() { + if cachePreloadedStationBattles(currentStationId, currentBattles) { + affected = append(affected, currentStationId) + } + currentStationId = "" + currentBattles = nil + } + for rows.Next() { + var battle StationBattleData + if err := rows.StructScan(&battle); err != nil { + log.Errorf("Preload: station battle scan error - %s", err) + continue + } + if currentStationId != "" && battle.StationId != currentStationId { + flushCurrent() + } + if currentStationId == "" { + currentStationId = battle.StationId + } + currentBattles = append(currentBattles, battle) + count++ + } + flushCurrent() + + markPreloadedStationsHydrated(populateRtree) + + return count +} diff --git a/decoder/station_battle_test.go b/decoder/station_battle_test.go new file mode 100644 index 00000000..9efaf67a --- /dev/null +++ b/decoder/station_battle_test.go @@ -0,0 +1,963 @@ +package decoder + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/guregu/null/v6" + "github.com/jellydator/ttlcache/v3" + + "golbat/config" + "golbat/db" + "golbat/geo" + "golbat/pogo" + "golbat/stats_collector" + "golbat/webhooks" +) + +type recordingWebhooksSender struct { + messages []webhooks.WebhookType +} + +func (sender *recordingWebhooksSender) AddMessage(whType webhooks.WebhookType, _ any, _ []geo.AreaName) { + sender.messages = append(sender.messages, whType) +} + +type recordingStatsCollector struct { + stats_collector.StatsCollector + maxBattleLevels []int64 +} + +func (collector *recordingStatsCollector) UpdateMaxBattleCount(_ []geo.AreaName, level int64) { + collector.maxBattleLevels = append(collector.maxBattleLevels, level) +} + +func TestUpsertCachedStationBattleIgnoresUpdatedOnlyChange(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + battle := StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now - 60, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + Updated: now, + } + + if !upsertCachedStationBattle(battle, now) { + t.Fatal("expected first insert to change cache") + } + + battle.Updated = now + 120 + if upsertCachedStationBattle(battle, now) { + t.Fatal("expected updated-only change to be ignored") + } +} + +func TestUpsertCachedStationBattleDropsEarlierEndAfterLaterObservation(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now - 60, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, now) + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 2, + BattleStart: now - 60, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(133), + }, now) + + battles := getKnownStationBattles("station-1", nil, now) + if len(battles) != 1 { + t.Fatalf("expected 1 battle after later observation, got %d", len(battles)) + } + if battles[0].BreadBattleSeed != 2 { + t.Fatalf("expected seed 2 to replace earlier battle, got %d", battles[0].BreadBattleSeed) + } +} + +func TestUpsertCachedStationBattleReplacesEqualEndBattle(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now - 120, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + }, now) + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 2, + BattleStart: now - 60, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(133), + }, now) + + battles := getKnownStationBattles("station-1", nil, now) + if len(battles) != 1 { + t.Fatalf("expected 1 battle after equal-end replacement, got %d", len(battles)) + } + if battles[0].BreadBattleSeed != 2 { + t.Fatalf("expected latest equal-end seed 2, got %d", battles[0].BreadBattleSeed) + } +} + +func TestUpsertCachedStationBattleKeepsLongerBattleWhenShorterObserved(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 3, + BattleStart: now - 120, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(374), + }, now) + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now - 60, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, now) + + battles := getKnownStationBattles("station-1", nil, now) + if len(battles) != 2 { + t.Fatalf("expected longer and shorter battles to coexist, got %d", len(battles)) + } + if battles[0].BreadBattleSeed != 1 || battles[1].BreadBattleSeed != 2 { + t.Fatalf("unexpected battle ordering after shorter observation: %+v", battles) + } +} + +func TestCanonicalStationBattleUsesLatestEnd(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now - 60, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, now) + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 2, + BattleStart: now - 120, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(133), + }, now) + + battles := getKnownStationBattles("station-1", nil, now) + if len(battles) != 1 { + t.Fatalf("expected later-ending battle to replace earlier one, got %d battles", len(battles)) + } + if battles[0].BreadBattleSeed != 2 { + t.Fatalf("expected latest-ending battle first, got seed %d", battles[0].BreadBattleSeed) + } + + canonical := canonicalStationBattleFromSlice(nil, battles, now) + if canonical == nil || canonical.BreadBattleSeed != 2 { + t.Fatalf("expected canonical seed 2, got %+v", canonical) + } +} + +func TestBuildStationResultPrefersCurrentActiveProjection(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + StartTime: now - 3600, + EndTime: now + 3600, + Updated: now, + BattleLevel: null.IntFrom(1), + BattleStart: null.IntFrom(now - 60), + BattleEnd: null.IntFrom(now + 1800), + BattlePokemonId: null.IntFrom(527), + }, + } + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: station.Id, + BattleLevel: 2, + BattleStart: now - 120, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(133), + }, now) + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now - 60, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, now) + + result := BuildStationResult(station) + if result.BattlePokemonId.ValueOrZero() != 527 { + t.Fatalf("expected current active pokemon 527, got %d", result.BattlePokemonId.ValueOrZero()) + } + if len(result.Battles) != 2 { + t.Fatalf("expected both battles to remain known, got %d", len(result.Battles)) + } +} + +func TestStationFortFilterMatchesSecondaryBattle(t *testing.T) { + now := time.Now().Unix() + filter := ApiFortDnfFilter{ + BattlePokemon: []ApiDnfId{{Pokemon: 133}}, + } + lookup := FortLookup{ + FortType: STATION, + StationBattles: []FortLookupStationBattle{ + {BattleEndTimestamp: now + 1800, BattleLevel: 1, BattlePokemonId: 527}, + {BattleEndTimestamp: now + 7200, BattleLevel: 2, BattlePokemonId: 133}, + }, + } + + if !isFortDnfMatch(STATION, &lookup, &filter, now) { + t.Fatal("expected station filter to match secondary battle") + } +} + +func TestGetActiveStationBattlesKeepsFutureBattleCached(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + future := StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now + 600, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + } + + if !upsertCachedStationBattle(future, now) { + t.Fatal("expected future battle insert to change cache") + } + + if active := getActiveStationBattles("station-1", nil, now); len(active) != 0 { + t.Fatalf("expected no active battles, got %d", len(active)) + } + + cached, ok := stationBattleCache.Load("station-1") + if !ok || len(cached) != 1 { + t.Fatalf("expected future battle to remain cached, got ok=%t len=%d", ok, len(cached)) + } + if cached[0].BreadBattleSeed != future.BreadBattleSeed { + t.Fatalf("expected cached seed %d, got %d", future.BreadBattleSeed, cached[0].BreadBattleSeed) + } +} + +func TestCanonicalStationBattleKeepsLongerBattleWhenShorterFutureObserved(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 3, + BattleStart: now - 120, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(374), + }, now) + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 2, + BattleStart: now + 600, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, now) + + battles := getKnownStationBattles("station-1", nil, now) + canonical := canonicalStationBattleFromSlice(nil, battles, now) + if canonical == nil || canonical.BreadBattleSeed != 1 { + t.Fatalf("expected longer existing battle seed 1 to remain canonical, got %+v", canonical) + } +} + +func TestCanonicalStationBattlePrefersCurrentActiveProjection(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + BattleLevel: null.IntFrom(1), + BattleStart: null.IntFrom(now - 60), + BattleEnd: null.IntFrom(now + 1800), + BattlePokemonId: null.IntFrom(527), + }, + } + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: station.Id, + BattleLevel: 2, + BattleStart: now - 120, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(133), + }, now) + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now - 60, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, now) + + battles := getKnownStationBattles(station.Id, station, now) + if len(battles) != 2 { + t.Fatalf("expected both active battles to remain known, got %d", len(battles)) + } + + canonical := canonicalStationBattleFromSlice(station, battles, now) + if canonical == nil || canonical.BreadBattleSeed != 1 { + t.Fatalf("expected current station projection seed 1 to be canonical, got %+v", canonical) + } +} + +func TestBuildStationResultProjectsFutureBattleFromCache(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + StartTime: now - 3600, + EndTime: now + 3600, + IsBattleAvailable: true, + Updated: now, + BattleLevel: null.IntFrom(1), + BattleStart: null.IntFrom(now + 600), + BattleEnd: null.IntFrom(now + 3600), + BattlePokemonId: null.IntFrom(527), + }, + } + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now + 600, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + }, now) + + result := BuildStationResult(station) + if result.BattlePokemonId.ValueOrZero() != 527 { + t.Fatalf("expected future battle in compatibility fields, got %+v", result) + } + if len(result.Battles) != 1 { + t.Fatalf("expected 1 known battle, got %d", len(result.Battles)) + } + if !result.IsBattleAvailable { + t.Fatal("expected server is_battle_available flag to be preserved") + } +} + +func TestBuildFortLookupStationBattlesIncludesFutureBattle(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + station := &Station{StationData: StationData{Id: "station-1"}} + + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now + 600, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + }, now) + + battles := buildFortLookupStationBattles(station, now) + if len(battles) != 1 { + t.Fatalf("expected future battle in fort lookup, got %d", len(battles)) + } + if battles[0].BattlePokemonId != 527 { + t.Fatalf("expected battle pokemon 527, got %d", battles[0].BattlePokemonId) + } +} + +func TestCachePreloadedStationBattlesPreservesPersistedSetRegardlessOfInputOrder(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + + if !cachePreloadedStationBattles("station-1", []StationBattleData{ + { + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now + 600, + BattleEnd: now + 1800, + BattlePokemonId: null.IntFrom(527), + }, + { + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 3, + BattleStart: now - 120, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(374), + }, + }) { + t.Fatal("expected preloaded station battles to be cached") + } + + battles := getKnownStationBattles("station-1", nil, now) + if len(battles) != 2 { + t.Fatalf("expected both persisted battles after preload, got %d", len(battles)) + } + if battles[0].BreadBattleSeed != 1 || battles[1].BreadBattleSeed != 2 { + t.Fatalf("unexpected preloaded battle ordering: %+v", battles) + } +} + +func TestCreateStationWebhooksSkipsEmptyExistingStation(t *testing.T) { + initStationBattleCache() + previousSender := webhooksSender + previousStats := statsCollector + sender := &recordingWebhooksSender{} + webhooksSender = sender + statsCollector = stats_collector.NewNoopStatsCollector() + defer func() { + webhooksSender = previousSender + statsCollector = previousStats + }() + + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + CellId: 123, + EndTime: now + 3600, + Updated: now, + }, + } + station.oldValues = StationOldValues{ + EndTime: now - 3600, + BattleListSignature: "", + } + + createStationWebhooks(station) + if len(sender.messages) != 0 { + t.Fatalf("expected no max_battle webhook, got %v", sender.messages) + } +} + +func TestCreateStationWebhooksEmitsFutureBattle(t *testing.T) { + initStationBattleCache() + previousSender := webhooksSender + previousStats := statsCollector + sender := &recordingWebhooksSender{} + webhooksSender = sender + statsCollector = &recordingStatsCollector{StatsCollector: stats_collector.NewNoopStatsCollector()} + defer func() { + webhooksSender = previousSender + statsCollector = previousStats + }() + + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + CellId: 123, + EndTime: now + 7200, + IsBattleAvailable: false, + Updated: now, + }, + } + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now + 600, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + }, now) + station.oldValues = StationOldValues{ + EndTime: station.EndTime, + BattleListSignature: "", + } + + createStationWebhooks(station) + if len(sender.messages) != 1 || sender.messages[0] != webhooks.MaxBattle { + t.Fatalf("expected one max_battle webhook, got %v", sender.messages) + } +} + +func TestCreateStationWebhooksDoesNotRecountCanonicalBattleSeed(t *testing.T) { + initStationBattleCache() + previousSender := webhooksSender + previousStats := statsCollector + sender := &recordingWebhooksSender{} + collector := &recordingStatsCollector{StatsCollector: stats_collector.NewNoopStatsCollector()} + webhooksSender = sender + statsCollector = collector + defer func() { + webhooksSender = previousSender + statsCollector = previousStats + }() + + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + CellId: 123, + EndTime: now + 7200, + Updated: now, + }, + } + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 3, + BattleStart: now - 600, + BattleEnd: now + 7200, + BattlePokemonId: null.IntFrom(374), + }, now) + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 2, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now + 600, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + }, now) + station.oldValues = StationOldValues{ + HasCanonicalBattle: true, + CanonicalBattleSeed: 1, + EndTime: station.EndTime, + BattleListSignature: "old-signature", + } + + createStationWebhooks(station) + if len(sender.messages) != 1 || sender.messages[0] != webhooks.MaxBattle { + t.Fatalf("expected one max_battle webhook, got %v", sender.messages) + } + if len(collector.maxBattleLevels) != 0 { + t.Fatalf("expected no max battle metric increment, got %v", collector.maxBattleLevels) + } +} + +func TestCreateStationWebhooksCountsZeroSeedCanonicalBattle(t *testing.T) { + initStationBattleCache() + previousSender := webhooksSender + previousStats := statsCollector + sender := &recordingWebhooksSender{} + collector := &recordingStatsCollector{StatsCollector: stats_collector.NewNoopStatsCollector()} + webhooksSender = sender + statsCollector = collector + defer func() { + webhooksSender = previousSender + statsCollector = previousStats + }() + + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + CellId: 123, + EndTime: now + 7200, + Updated: now, + }, + } + upsertCachedStationBattle(StationBattleData{ + BreadBattleSeed: 0, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now - 600, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(527), + }, now) + station.oldValues = StationOldValues{ + EndTime: station.EndTime, + BattleListSignature: "", + } + + createStationWebhooks(station) + if len(sender.messages) != 1 || sender.messages[0] != webhooks.MaxBattle { + t.Fatalf("expected one max_battle webhook, got %v", sender.messages) + } + if len(collector.maxBattleLevels) != 1 || collector.maxBattleLevels[0] != 1 { + t.Fatalf("expected one max battle metric increment, got %v", collector.maxBattleLevels) + } +} + +func TestSyncStationBattlesFromProtoAllowsZeroSeed(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + BattleLevel: null.IntFrom(2), + BattleStart: null.IntFrom(now - 60), + BattleEnd: null.IntFrom(now + 3600), + BattlePokemonId: null.IntFrom(133), + }, + } + + syncStationBattlesFromProto(station, &pogo.BreadBattleDetailProto{ + BreadBattleSeed: 0, + BattleWindowStartMs: (now - 60) * 1000, + BattleWindowEndMs: (now + 3600) * 1000, + BattleLevel: pogo.BreadBattleLevel_BREAD_BATTLE_LEVEL_2, + BattlePokemon: &pogo.PokemonProto{PokemonId: 133}, + }) + + battles := getKnownStationBattles(station.Id, station, now) + if len(battles) != 1 || battles[0].BreadBattleSeed != 0 { + t.Fatalf("expected zero-seed battle to be cached, got %+v", battles) + } + if station.BattlePokemonId.ValueOrZero() != 133 { + t.Fatalf("expected zero-seed battle projection, got %d", station.BattlePokemonId.ValueOrZero()) + } +} + +func TestGetKnownStationBattlesDoesNotMutateCacheOnRead(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + expired := StationBattleData{ + BreadBattleSeed: 1, + StationId: "station-1", + BattleLevel: 1, + BattleStart: now - 7200, + BattleEnd: now - 60, + BattlePokemonId: null.IntFrom(527), + } + current := StationBattleData{ + BreadBattleSeed: 2, + StationId: "station-1", + BattleLevel: 2, + BattleStart: now - 60, + BattleEnd: now + 3600, + BattlePokemonId: null.IntFrom(133), + } + stationBattleCache.Store("station-1", []StationBattleData{current, expired}) + + battles := getKnownStationBattles("station-1", nil, now) + if len(battles) != 1 || battles[0].BreadBattleSeed != 2 { + t.Fatalf("expected only current battle from read, got %+v", battles) + } + + cached, ok := stationBattleCache.Load("station-1") + if !ok || len(cached) != 2 { + t.Fatalf("expected cached slice to remain unchanged, got %+v", cached) + } +} + +func TestBuildStationResultSuppressesStaleProjectionAfterExpiredHydratedCache(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + station := &Station{ + StationData: StationData{ + Id: "station-1", + Name: "Station", + Lat: 1, + Lon: 2, + StartTime: now - 3600, + EndTime: now + 3600, + Updated: now, + BattleLevel: null.IntFrom(1), + BattleStart: null.IntFrom(now - 600), + BattleEnd: null.IntFrom(now + 600), + BattlePokemonId: null.IntFrom(527), + }, + } + markStationBattlesHydrated(station.Id) + + stationBattleCache.Store("station-1", []StationBattleData{{ + BreadBattleSeed: 1, + StationId: station.Id, + BattleLevel: 1, + BattleStart: now - 7200, + BattleEnd: now - 60, + BattlePokemonId: null.IntFrom(527), + }}) + + result := BuildStationResult(station) + if result.BattleEnd.Valid || result.BattlePokemonId.Valid { + t.Fatalf("expected expired hydrated cache to suppress stale projection, got %+v", result) + } + if _, ok := stationBattleCache.Load(station.Id); ok { + t.Fatal("expected expired hydrated cache entry to be cleaned up") + } +} + +func TestGetStationRecordReadOnlyRetriesHydrationOnCachedStation(t *testing.T) { + initStationBattleCache() + stationId := "station-hydration-retry" + station := &Station{StationData: StationData{Id: stationId}} + stationCache.Set(stationId, station, ttlcache.DefaultTTL) + defer stationCache.Delete(stationId) + defer clearStationBattleCaches(stationId) + + attempts := 0 + previousHydrate := hydrateStationBattlesForStationFunc + hydrateStationBattlesForStationFunc = func(_ context.Context, _ db.DbDetails, station *Station, _ int64) error { + attempts++ + if attempts == 1 { + return errors.New("boom") + } + markStationBattlesHydrated(station.Id) + return nil + } + defer func() { + hydrateStationBattlesForStationFunc = previousHydrate + }() + + record, unlock, err := GetStationRecordReadOnly(context.Background(), db.DbDetails{}, stationId, "test") + if err != nil { + t.Fatalf("expected cached station to be served even when hydration fails, got %v", err) + } + if record == nil || unlock == nil { + t.Fatal("expected cached station record on hydration failure") + } + unlock() + + record, unlock, err = GetStationRecordReadOnly(context.Background(), db.DbDetails{}, stationId, "test") + if err != nil { + t.Fatalf("expected second hydration attempt to succeed, got %v", err) + } + if record == nil || unlock == nil { + t.Fatal("expected cached station record after retry") + } + unlock() + if attempts != 2 { + t.Fatalf("expected hydration retry on cached station, got %d attempts", attempts) + } +} + +func TestGetStationRecordReadOnlyKeepsSingletonAfterHydrationFailureOnCacheMiss(t *testing.T) { + initStationBattleCache() + stationId := "station-hydration-miss-retry" + defer stationCache.Delete(stationId) + defer clearStationBattleCaches(stationId) + + loadCalls := 0 + previousLoad := loadStationFromDatabaseFunc + loadStationFromDatabaseFunc = func(_ context.Context, _ db.DbDetails, id string, station *Station) error { + loadCalls++ + station.Id = id + station.Name = "Station" + return nil + } + defer func() { + loadStationFromDatabaseFunc = previousLoad + }() + + hydrateCalls := 0 + previousHydrate := hydrateStationBattlesForStationFunc + hydrateStationBattlesForStationFunc = func(_ context.Context, _ db.DbDetails, station *Station, _ int64) error { + hydrateCalls++ + if hydrateCalls == 1 { + return errors.New("boom") + } + markStationBattlesHydrated(station.Id) + return nil + } + defer func() { + hydrateStationBattlesForStationFunc = previousHydrate + }() + + record, unlock, err := GetStationRecordReadOnly(context.Background(), db.DbDetails{}, stationId, "test") + if err == nil { + if unlock != nil { + unlock() + } + t.Fatal("expected first cache-miss hydration to fail") + } + if record != nil || unlock != nil { + t.Fatal("expected no station return on failed cache-miss hydration") + } + + cachedItem := stationCache.Get(stationId) + if cachedItem == nil { + t.Fatal("expected failed hydration to keep cached station instance") + } + cachedStation := cachedItem.Value() + + record, unlock, err = GetStationRecordReadOnly(context.Background(), db.DbDetails{}, stationId, "test") + if err != nil { + t.Fatalf("expected retry on cached singleton to succeed, got %v", err) + } + if record == nil || unlock == nil { + t.Fatal("expected cached station record after retry") + } + if record != cachedStation { + unlock() + t.Fatal("expected retry to reuse cached station singleton") + } + unlock() + + if loadCalls != 1 { + t.Fatalf("expected one DB load across retry, got %d", loadCalls) + } + if hydrateCalls != 2 { + t.Fatalf("expected two hydration attempts across retry, got %d", hydrateCalls) + } +} + +func TestGetStationRecordReadOnlySkipsHydrationAfterProtoSync(t *testing.T) { + initStationBattleCache() + now := time.Now().Unix() + stationId := "station-hydration-skip" + station := &Station{StationData: StationData{Id: stationId}} + stationCache.Set(stationId, station, ttlcache.DefaultTTL) + defer stationCache.Delete(stationId) + defer clearStationBattleCaches(stationId) + + syncStationBattlesFromProto(station, &pogo.BreadBattleDetailProto{ + BreadBattleSeed: 7, + BattleWindowStartMs: (now - 60) * 1000, + BattleWindowEndMs: (now + 3600) * 1000, + BattleLevel: pogo.BreadBattleLevel_BREAD_BATTLE_LEVEL_2, + BattlePokemon: &pogo.PokemonProto{PokemonId: 133}, + }) + + attempts := 0 + previousHydrate := hydrateStationBattlesForStationFunc + hydrateStationBattlesForStationFunc = func(_ context.Context, _ db.DbDetails, _ *Station, _ int64) error { + attempts++ + return nil + } + defer func() { + hydrateStationBattlesForStationFunc = previousHydrate + }() + + record, unlock, err := GetStationRecordReadOnly(context.Background(), db.DbDetails{}, stationId, "test") + if err != nil { + t.Fatalf("expected cached station read to succeed, got %v", err) + } + if record == nil || unlock == nil { + t.Fatal("expected cached station record") + } + unlock() + if attempts != 0 { + t.Fatalf("expected no DB hydration after proto sync, got %d attempts", attempts) + } +} + +func TestMarkPreloadedStationsHydratedMarksEmptyStations(t *testing.T) { + initStationBattleCache() + stationId := "station-preload-empty" + station := &Station{StationData: StationData{Id: stationId}} + stationCache.Set(stationId, station, ttlcache.DefaultTTL) + defer stationCache.Delete(stationId) + defer clearStationBattleCaches(stationId) + + if hasHydratedStationBattles(stationId) { + t.Fatal("expected station to start unhydrated") + } + + markPreloadedStationsHydrated(false) + + if !hasHydratedStationBattles(stationId) { + t.Fatal("expected empty preloaded station to be marked hydrated") + } +} + +func TestGetStationRecordReadOnlyHydrationRefreshesFortLookup(t *testing.T) { + initStationBattleCache() + previousFortInMemory := config.Config.FortInMemory + config.Config.FortInMemory = true + defer func() { + config.Config.FortInMemory = previousFortInMemory + }() + + now := time.Now().Unix() + stationId := "station-hydration-lookup" + station := &Station{ + StationData: StationData{ + Id: stationId, + Lat: 1, + Lon: 2, + BattleLevel: null.IntFrom(1), + BattleStart: null.IntFrom(now - 600), + BattleEnd: null.IntFrom(now + 600), + BattlePokemonId: null.IntFrom(527), + }, + } + stationCache.Set(stationId, station, ttlcache.DefaultTTL) + defer stationCache.Delete(stationId) + defer clearStationBattleCaches(stationId) + fortLookupCache.Store(stationId, FortLookup{ + FortType: STATION, + Lat: station.Lat, + Lon: station.Lon, + BattleEndTimestamp: station.BattleEnd.ValueOrZero(), + BattleLevel: int8(station.BattleLevel.ValueOrZero()), + BattlePokemonId: int16(station.BattlePokemonId.ValueOrZero()), + }) + + previousHydrate := hydrateStationBattlesForStationFunc + hydrateStationBattlesForStationFunc = func(_ context.Context, _ db.DbDetails, station *Station, _ int64) error { + markStationBattlesHydrated(station.Id) + stationBattleCache.Delete(station.Id) + return nil + } + defer func() { + hydrateStationBattlesForStationFunc = previousHydrate + }() + + record, unlock, err := GetStationRecordReadOnly(context.Background(), db.DbDetails{}, stationId, "test") + if err != nil { + t.Fatalf("expected hydration to succeed, got %v", err) + } + if record == nil || unlock == nil { + t.Fatal("expected cached station") + } + unlock() + + lookup, ok := fortLookupCache.Load(stationId) + if !ok { + t.Fatal("expected fort lookup entry") + } + if lookup.BattleEndTimestamp != 0 || lookup.BattleLevel != 0 || lookup.BattlePokemonId != 0 { + t.Fatalf("expected fort lookup to be cleared after hydration, got %+v", lookup) + } +} diff --git a/decoder/station_decode.go b/decoder/station_decode.go index 6e9f54fa..f042c1e4 100644 --- a/decoder/station_decode.go +++ b/decoder/station_decode.go @@ -48,6 +48,8 @@ func (station *Station) updateFromStationProto(stationProto *pogo.StationProto, log.Infof("[DYNAMAX] Pokemon reward differs from battle: Battle %v - Reward %v", pokemon, rewardPokemon) } } + } else { + clearStationBattleProjection(station) } station.SetCellId(int64(cellId)) return station diff --git a/decoder/station_state.go b/decoder/station_state.go index 2199bee0..2c10c1a5 100644 --- a/decoder/station_state.go +++ b/decoder/station_state.go @@ -25,27 +25,28 @@ const stationSelectColumns = `id, lat, lon, name, cell_id, start_time, end_time, stationed_pokemon` type StationWebhook struct { - Id string `json:"id"` - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - Name string `json:"name"` - StartTime int64 `json:"start_time"` - EndTime int64 `json:"end_time"` - IsBattleAvailable bool `json:"is_battle_available"` - BattleLevel null.Int `json:"battle_level"` - BattleStart null.Int `json:"battle_start"` - BattleEnd null.Int `json:"battle_end"` - BattlePokemonId null.Int `json:"battle_pokemon_id"` - BattlePokemonForm null.Int `json:"battle_pokemon_form"` - BattlePokemonCostume null.Int `json:"battle_pokemon_costume"` - BattlePokemonGender null.Int `json:"battle_pokemon_gender"` - BattlePokemonAlignment null.Int `json:"battle_pokemon_alignment"` - BattlePokemonBreadMode null.Int `json:"battle_pokemon_bread_mode"` - BattlePokemonMove1 null.Int `json:"battle_pokemon_move_1"` - BattlePokemonMove2 null.Int `json:"battle_pokemon_move_2"` - TotalStationedPokemon null.Int `json:"total_stationed_pokemon"` - TotalStationedGmax null.Int `json:"total_stationed_gmax"` - Updated int64 `json:"updated"` + Id string `json:"id"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Name string `json:"name"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + IsBattleAvailable bool `json:"is_battle_available"` + BattleLevel null.Int `json:"battle_level"` + BattleStart null.Int `json:"battle_start"` + BattleEnd null.Int `json:"battle_end"` + BattlePokemonId null.Int `json:"battle_pokemon_id"` + BattlePokemonForm null.Int `json:"battle_pokemon_form"` + BattlePokemonCostume null.Int `json:"battle_pokemon_costume"` + BattlePokemonGender null.Int `json:"battle_pokemon_gender"` + BattlePokemonAlignment null.Int `json:"battle_pokemon_alignment"` + BattlePokemonBreadMode null.Int `json:"battle_pokemon_bread_mode"` + BattlePokemonMove1 null.Int `json:"battle_pokemon_move_1"` + BattlePokemonMove2 null.Int `json:"battle_pokemon_move_2"` + TotalStationedPokemon null.Int `json:"total_stationed_pokemon"` + TotalStationedGmax null.Int `json:"total_stationed_gmax"` + Battles []StationBattleWebhook `json:"battles,omitempty"` + Updated int64 `json:"updated"` } func loadStationFromDatabase(ctx context.Context, db db.DbDetails, stationId string, station *Station) error { @@ -55,6 +56,9 @@ func loadStationFromDatabase(ctx context.Context, db db.DbDetails, stationId str return err } +var loadStationFromDatabaseFunc = loadStationFromDatabase +var hydrateStationBattlesForStationFunc = hydrateStationBattlesForStation + // peekStationRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. func peekStationRecord(stationId string, caller string) (*Station, func(), error) { @@ -74,11 +78,18 @@ func GetStationRecordReadOnly(ctx context.Context, db db.DbDetails, stationId st if item := stationCache.Get(stationId); item != nil { station := item.Value() station.Lock(caller) + if !hasHydratedStationBattles(stationId) { + if err := hydrateStationBattlesForStationFunc(ctx, db, station, time.Now().Unix()); err != nil { + log.Debugf("GetStationRecordReadOnly: station battle hydration failed for %s: %v", stationId, err) + } else if config.Config.FortInMemory { + fortRtreeUpdateStationOnSave(station) + } + } return station, func() { station.Unlock() }, nil } dbStation := Station{} - err := loadStationFromDatabase(ctx, db, stationId, &dbStation) + err := loadStationFromDatabaseFunc(ctx, db, stationId, &dbStation) if errors.Is(err, sql.ErrNoRows) { return nil, nil, nil } @@ -90,14 +101,23 @@ func GetStationRecordReadOnly(ctx context.Context, db db.DbDetails, stationId st // Atomically cache the loaded Station - if another goroutine raced us, // we'll get their Station and use that instead (ensuring same mutex) existingStation, _ := stationCache.GetOrSetFunc(stationId, func() *Station { - if config.Config.FortInMemory { - fortRtreeUpdateStationOnGet(&dbStation) - } return &dbStation }) station := existingStation.Value() station.Lock(caller) + loadedFromDb := station == &dbStation + hydratedBattles := false + if !hasHydratedStationBattles(stationId) { + if err := hydrateStationBattlesForStationFunc(ctx, db, station, time.Now().Unix()); err != nil { + station.Unlock() + return nil, nil, err + } + hydratedBattles = true + } + if config.Config.FortInMemory && (loadedFromDb || hydratedBattles) { + fortRtreeUpdateStationOnGet(station) + } return station, func() { station.Unlock() }, nil } @@ -125,7 +145,7 @@ func getOrCreateStationRecord(ctx context.Context, db db.DbDetails, stationId st if station.newRecord { // We should attempt to load from database - err := loadStationFromDatabase(ctx, db, stationId, station) + err := loadStationFromDatabaseFunc(ctx, db, stationId, station) if err != nil { if !errors.Is(err, sql.ErrNoRows) { station.Unlock() @@ -135,6 +155,10 @@ func getOrCreateStationRecord(ctx context.Context, db db.DbDetails, stationId st // We loaded from DB station.newRecord = false station.ClearDirty() + if err := hydrateStationBattlesForStationFunc(ctx, db, station, time.Now().Unix()); err != nil { + station.Unlock() + return nil, nil, err + } if config.Config.FortInMemory { fortRtreeUpdateStationOnGet(station) } @@ -147,9 +171,11 @@ func getOrCreateStationRecord(ctx context.Context, db db.DbDetails, stationId st func saveStationRecord(ctx context.Context, db db.DbDetails, station *Station) { now := time.Now().Unix() + snapshot := collectStationBattleSnapshot(station, now) + battleListChanged := station.oldValues.BattleListSignature != snapshot.Signature // Skip save if not dirty and was updated recently (15-min debounce) - if !station.IsDirty() && !station.IsNewRecord() { + if !station.IsDirty() && !station.IsNewRecord() && !battleListChanged { if station.Updated > now-GetUpdateThreshold(900) { return } @@ -176,18 +202,26 @@ func saveStationRecord(ctx context.Context, db db.DbDetails, station *Station) { // Fallback to direct write if queue not initialized _ = stationWriteDB(db, station, isNewRecord) } + if battleListChanged { + if stationBattleQueue != nil { + stationBattleQueue.Enqueue(stationBattleWriteFromSlice(station.Id, snapshot.Battles), false, 0) + } else { + _ = storeStationBattleSnapshot(ctx, db, stationBattleWriteFromSlice(station.Id, snapshot.Battles)) + } + } if dbDebugEnabled { station.changedFields = station.changedFields[:0] } station.ClearDirty() - createStationWebhooks(station) + createStationWebhooksWithSnapshot(station, snapshot, isNewRecord) if isNewRecord { stationCache.Set(station.Id, station, ttlcache.DefaultTTL) station.newRecord = false } if config.Config.FortInMemory { - fortRtreeUpdateStationOnSave(station) + genericUpdateFort(station.Id, station.Lat, station.Lon, false) + updateStationLookupFromSnapshot(station, snapshot) } } @@ -253,41 +287,52 @@ func stationWriteDB(db db.DbDetails, station *Station, isNewRecord bool) error { } func createStationWebhooks(station *Station) { + createStationWebhooksWithSnapshot(station, collectStationBattleSnapshot(station, time.Now().Unix()), station.IsNewRecord()) +} + +func createStationWebhooksWithSnapshot(station *Station, snapshot stationBattleSnapshot, isNew bool) { old := &station.oldValues - isNew := station.IsNewRecord() + currentSignature := snapshot.Signature - if isNew || station.BattlePokemonId.Valid && (old.EndTime != station.EndTime || - old.BattleEnd != station.BattleEnd || - old.BattlePokemonId != station.BattlePokemonId || - old.BattlePokemonForm != station.BattlePokemonForm || - old.BattlePokemonCostume != station.BattlePokemonCostume || - old.BattlePokemonGender != station.BattlePokemonGender || - old.BattlePokemonBreadMode != station.BattlePokemonBreadMode) { + if currentSignature == "" { + return + } + + if isNew || old.EndTime != station.EndTime || old.BattleListSignature != currentSignature { + canonical := snapshot.Canonical + if canonical == nil { + canonical = stationBattleFromStationProjection(station) + } stationHook := StationWebhook{ - Id: station.Id, - Latitude: station.Lat, - Longitude: station.Lon, - Name: station.Name, - StartTime: station.StartTime, - EndTime: station.EndTime, - IsBattleAvailable: station.IsBattleAvailable, - BattleLevel: station.BattleLevel, - BattleStart: station.BattleStart, - BattleEnd: station.BattleEnd, - BattlePokemonId: station.BattlePokemonId, - BattlePokemonForm: station.BattlePokemonForm, - BattlePokemonCostume: station.BattlePokemonCostume, - BattlePokemonGender: station.BattlePokemonGender, - BattlePokemonAlignment: station.BattlePokemonAlignment, - BattlePokemonBreadMode: station.BattlePokemonBreadMode, - BattlePokemonMove1: station.BattlePokemonMove1, - BattlePokemonMove2: station.BattlePokemonMove2, - TotalStationedPokemon: station.TotalStationedPokemon, - TotalStationedGmax: station.TotalStationedGmax, - Updated: station.Updated, + Id: station.Id, + Latitude: station.Lat, + Longitude: station.Lon, + Name: station.Name, + StartTime: station.StartTime, + EndTime: station.EndTime, + IsBattleAvailable: station.IsBattleAvailable, + TotalStationedPokemon: station.TotalStationedPokemon, + TotalStationedGmax: station.TotalStationedGmax, + Battles: buildStationWebhookBattlesFromSlice(snapshot.Battles), + Updated: station.Updated, + } + if canonical != nil { + stationHook.BattleLevel = null.IntFrom(int64(canonical.BattleLevel)) + stationHook.BattleStart = null.IntFrom(canonical.BattleStart) + stationHook.BattleEnd = null.IntFrom(canonical.BattleEnd) + stationHook.BattlePokemonId = canonical.BattlePokemonId + stationHook.BattlePokemonForm = canonical.BattlePokemonForm + stationHook.BattlePokemonCostume = canonical.BattlePokemonCostume + stationHook.BattlePokemonGender = canonical.BattlePokemonGender + stationHook.BattlePokemonAlignment = canonical.BattlePokemonAlignment + stationHook.BattlePokemonBreadMode = canonical.BattlePokemonBreadMode + stationHook.BattlePokemonMove1 = canonical.BattlePokemonMove1 + stationHook.BattlePokemonMove2 = canonical.BattlePokemonMove2 } areas := MatchStatsGeofenceWithCell(station.Lat, station.Lon, uint64(station.CellId)) webhooksSender.AddMessage(webhooks.MaxBattle, stationHook, areas) - statsCollector.UpdateMaxBattleCount(areas, station.BattleLevel.ValueOrZero()) + if seed := canonicalBattleSeed(canonical); canonical != nil && (isNew || !old.HasCanonicalBattle || old.CanonicalBattleSeed != seed) { + statsCollector.UpdateMaxBattleCount(areas, int64(canonical.BattleLevel)) + } } } diff --git a/decoder/writebehind_batch.go b/decoder/writebehind_batch.go index 9afac25c..f0c1bb9e 100644 --- a/decoder/writebehind_batch.go +++ b/decoder/writebehind_batch.go @@ -23,15 +23,16 @@ type S2CellData struct { // Typed queues for each entity type - using native key types for efficiency var ( - pokestopQueue *writebehind.TypedQueue[string, PokestopData] - gymQueue *writebehind.TypedQueue[string, GymData] - pokemonQueue *writebehind.TypedQueue[uint64, PokemonData] - spawnpointQueue *writebehind.TypedQueue[int64, SpawnpointData] - routeQueue *writebehind.TypedQueue[string, RouteData] - tappableQueue *writebehind.TypedQueue[uint64, TappableData] - stationQueue *writebehind.TypedQueue[string, StationData] - incidentQueue *writebehind.TypedQueue[string, IncidentData] - s2cellQueue *writebehind.TypedQueue[uint64, S2CellData] + pokestopQueue *writebehind.TypedQueue[string, PokestopData] + gymQueue *writebehind.TypedQueue[string, GymData] + pokemonQueue *writebehind.TypedQueue[uint64, PokemonData] + spawnpointQueue *writebehind.TypedQueue[int64, SpawnpointData] + routeQueue *writebehind.TypedQueue[string, RouteData] + tappableQueue *writebehind.TypedQueue[uint64, TappableData] + stationQueue *writebehind.TypedQueue[string, StationData] + stationBattleQueue *writebehind.TypedQueue[string, StationBattleWrite] + incidentQueue *writebehind.TypedQueue[string, IncidentData] + s2cellQueue *writebehind.TypedQueue[uint64, S2CellData] // QueueManager coordinates all queues queueManager *writebehind.QueueManager @@ -152,6 +153,19 @@ func InitTypedQueues(ctx context.Context, dbDetails db.DbDetails, stats stats_co }) queueManager.Register(stationQueue) + stationBattleQueue = writebehind.NewTypedQueue(writebehind.TypedQueueConfig[string, StationBattleWrite]{ + Name: "station_battle", + BatchSize: batchSize, + BatchTimeout: batchTimeout, + StartupDelaySeconds: startupDelay, + Limiter: limiter, + Db: dbDetails, + Stats: stats, + FlushFunc: flushStationBattleBatch, + KeyFunc: func(d StationBattleWrite) string { return d.StationId }, + }) + queueManager.Register(stationBattleQueue) + incidentQueue = writebehind.NewTypedQueue(writebehind.TypedQueueConfig[string, IncidentData]{ Name: "incident", BatchSize: batchSize, diff --git a/main.go b/main.go index 407da4ca..b54c696a 100644 --- a/main.go +++ b/main.go @@ -241,6 +241,10 @@ func main() { StartIncidentExpiry(db) } + if cfg.Cleanup.StationBattles == true { + StartStationBattleExpiry(db) + } + if cfg.Cleanup.Tappables == true { StartTappableExpiry(db) } diff --git a/sql/54_station_battle.up.sql b/sql/54_station_battle.up.sql new file mode 100644 index 00000000..73051d49 --- /dev/null +++ b/sql/54_station_battle.up.sql @@ -0,0 +1,23 @@ +CREATE TABLE `station_battle` ( + `bread_battle_seed` BIGINT NOT NULL, + `station_id` VARCHAR(35) NOT NULL, + `battle_level` TINYINT UNSIGNED NOT NULL, + `battle_start` INT UNSIGNED NOT NULL, + `battle_end` INT UNSIGNED NOT NULL, + `battle_pokemon_id` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_form` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_costume` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_gender` TINYINT unsigned DEFAULT NULL, + `battle_pokemon_alignment` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_bread_mode` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_move_1` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_move_2` SMALLINT unsigned DEFAULT NULL, + `battle_pokemon_stamina` INT unsigned DEFAULT NULL, + `battle_pokemon_cp_multiplier` FLOAT DEFAULT NULL, + `updated` INT UNSIGNED NOT NULL, + PRIMARY KEY(`bread_battle_seed`), + KEY `ix_station_battle_station_end` (`station_id`, `battle_end`), + KEY `ix_station_battle_end` (`battle_end`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_general_ci; diff --git a/stats.go b/stats.go index f3a2c85d..a569eac3 100644 --- a/stats.go +++ b/stats.go @@ -193,6 +193,30 @@ func StartIncidentExpiry(db *sqlx.DB) { }() } +func StartStationBattleExpiry(db *sqlx.DB) { + ticker := time.NewTicker(time.Hour + 13*time.Minute) + go func() { + for { + <-ticker.C + start := time.Now() + + var result sql.Result + var err error + + result, err = db.Exec("DELETE FROM station_battle WHERE battle_end < UNIX_TIMESTAMP();") + + elapsed := time.Since(start) + + if err != nil { + log.Errorf("DB - Cleanup of station_battle table error %s", err) + } else { + rows, _ := result.RowsAffected() + log.Infof("DB - Cleanup of station_battle table took %s (%d rows)", elapsed, rows) + } + } + }() +} + func StartTappableExpiry(db *sqlx.DB) { ticker := time.NewTicker(time.Hour + 16*time.Minute) go func() {