Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions decoder/api_fort.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func GymScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDetails) *Ap
start := time.Now()

for _, key := range returnKeys {
gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key)
gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanGym")
if err == nil && gym != nil {
gymCopy := buildGymResult(gym)
results = append(results, &gymCopy)
Expand All @@ -306,7 +306,7 @@ func PokestopScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDetails
start := time.Now()

for _, key := range returnKeys {
pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key)
pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanpokemon")
if err == nil && pokestop != nil {
pokestopCopy := buildPokestopResult(pokestop)
results = append(results, &pokestopCopy)
Expand All @@ -331,7 +331,7 @@ func StationScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDetails)
start := time.Now()

for _, key := range returnKeys {
station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key)
station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanStation")
if err == nil && station != nil {
stationCopy := buildStationResult(station)
results = append(results, &stationCopy)
Expand All @@ -356,7 +356,7 @@ func FortCombinedScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDet

gyms := make([]*ApiGymResult, 0, len(gymKeys))
for _, key := range gymKeys {
gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key)
gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanGymPokemon")
if err == nil && gym != nil {
gymCopy := buildGymResult(gym)
gyms = append(gyms, &gymCopy)
Expand All @@ -368,7 +368,7 @@ func FortCombinedScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDet

pokestops := make([]*ApiPokestopResult, 0, len(pokestopKeys))
for _, key := range pokestopKeys {
pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key)
pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanpokemonPokemon")
if err == nil && pokestop != nil {
pokestopCopy := buildPokestopResult(pokestop)
pokestops = append(pokestops, &pokestopCopy)
Expand All @@ -380,7 +380,7 @@ func FortCombinedScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDet

stations := make([]*ApiStationResult, 0, len(stationKeys))
for _, key := range stationKeys {
station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key)
station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanStationPokemon")
if err == nil && station != nil {
stationCopy := buildStationResult(station)
stations = append(stations, &stationCopy)
Expand Down
4 changes: 2 additions & 2 deletions decoder/api_pokemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func SearchPokemon(request ApiPokemonSearch) ([]*ApiPokemonResult, error) {
apiResults := make([]*ApiPokemonResult, 0, len(results))

for _, encounterId := range results {
pokemon, unlock, _ := peekPokemonRecordReadOnly(encounterId)
pokemon, unlock, _ := peekPokemonRecordReadOnly(encounterId, "API.Pokemon")
if pokemon != nil {
apiPokemon := buildApiPokemonResult(pokemon)
apiResults = append(apiResults, &apiPokemon)
Expand All @@ -146,7 +146,7 @@ func SearchPokemon(request ApiPokemonSearch) ([]*ApiPokemonResult, error) {
// Get one result

func GetOnePokemon(pokemonId uint64) *ApiPokemonResult {
item, unlock, _ := peekPokemonRecordReadOnly(pokemonId)
item, unlock, _ := peekPokemonRecordReadOnly(pokemonId, "API.PokemonById")
if item != nil {
apiPokemon := buildApiPokemonResult(item)
defer unlock()
Expand Down
2 changes: 1 addition & 1 deletion decoder/api_pokemon_scan_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func GetPokemonInArea(retrieveParameters ApiPokemonScan) []*ApiPokemonResult {
results := make([]*ApiPokemonResult, 0, len(returnKeys))

for _, key := range returnKeys {
pokemon, unlock, _ := peekPokemonRecordReadOnly(key)
pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v1")
if pokemon != nil {
apiPokemon := buildApiPokemonResult(pokemon)
unlock()
Expand Down
4 changes: 2 additions & 2 deletions decoder/api_pokemon_scan_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func GetPokemonInArea2(retrieveParameters ApiPokemonScan2) []*ApiPokemonResult {
startUnix := start.Unix()

for _, key := range returnKeys {
pokemon, unlock, _ := peekPokemonRecordReadOnly(key)
pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v2")
if pokemon != nil {
if pokemon.ExpireTimestamp.ValueOrZero() > startUnix {
apiPokemon := buildApiPokemonResult(pokemon)
Expand Down Expand Up @@ -183,7 +183,7 @@ func GrpcGetPokemonInArea2(retrieveParameters *pb.PokemonScanRequest) []*pb.Poke
startUnix := start.Unix()

for _, key := range returnKeys {
pokemon, unlock, _ := peekPokemonRecordReadOnly(key)
pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v2.pokemon")
if pokemon != nil {
if pokemon.ExpireTimestamp.ValueOrZero() > startUnix {
apiPokemon := pb.PokemonDetails{
Expand Down
4 changes: 2 additions & 2 deletions decoder/api_pokemon_scan_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func GetPokemonInArea3(retrieveParameters ApiPokemonScan3) *PokemonScan3Result {
startUnix := start.Unix()

for _, key := range returnKeys {
pokemon, unlock, _ := peekPokemonRecordReadOnly(key)
pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v3")
if pokemon != nil {
if pokemon.ExpireTimestamp.ValueOrZero() > startUnix {
apiPokemon := buildApiPokemonResult(pokemon)
Expand Down Expand Up @@ -200,7 +200,7 @@ func GrpcGetPokemonInArea3(retrieveParameters *pb.PokemonScanRequestV3) ([]*pb.P
startUnix := start.Unix()

for _, key := range returnKeys {
pokemon, unlock, _ := peekPokemonRecordReadOnly(key)
pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v3.pokemon")
if pokemon != nil {
if pokemon.ExpireTimestamp.ValueOrZero() > startUnix {
apiPokemon := pb.PokemonDetails{
Expand Down
4 changes: 2 additions & 2 deletions decoder/fort_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (ft *FortTracker) GetFortInfo(fortId string) *FortTrackerInfo {
// clearGymWithLock marks a gym as deleted while holding the object-level mutex
func clearGymWithLock(ctx context.Context, dbDetails db.DbDetails, gymId string, cellId uint64, removeFromTracker bool) {
// Load gym through cache (will load from DB if not cached)
gym, unlock, err := getGymRecordForUpdate(ctx, dbDetails, gymId)
gym, unlock, err := getGymRecordForUpdate(ctx, dbDetails, gymId, "clearGymWithLock")
if err != nil {
log.Errorf("FortTracker: failed to load gym %s - %s", gymId, err)
return
Expand Down Expand Up @@ -550,7 +550,7 @@ func clearGymWithLock(ctx context.Context, dbDetails db.DbDetails, gymId string,
// clearPokestopWithLock marks a pokestop as deleted while holding the object-level mutex
func clearPokestopWithLock(ctx context.Context, dbDetails db.DbDetails, stopId string, cellId uint64, removeFromTracker bool) {
// Load pokestop through cache (will load from DB if not cached)
pokestop, unlock, err := getPokestopRecordForUpdate(ctx, dbDetails, stopId)
pokestop, unlock, err := getPokestopRecordForUpdate(ctx, dbDetails, stopId, "clearPokestopWithLock")
if err != nil {
log.Errorf("FortTracker: failed to load pokestop %s - %s", stopId, err)
return
Expand Down
27 changes: 15 additions & 12 deletions decoder/gmo_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa
for _, fort := range p {
fortId := fort.Data.FortId
if fort.Data.FortType == pogo.FortType_CHECKPOINT && scanParameters.ProcessPokestops {
pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, fortId)
pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, fortId, "UpdateFortBatch")
if err != nil {
log.Errorf("getOrCreatePokestopRecord: %s", err)
continue
Expand All @@ -37,13 +37,13 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa
// To avoid deadlock, we do this after releasing the pokestop lock.
if isNewRecord && DoesGymExist(ctx, db, fortId) {
// Get shared fields from gym (with gym lock only)
gym, gymUnlock, _ := GetGymRecordReadOnly(ctx, db, fortId)
gym, gymUnlock, _ := GetGymRecordReadOnly(ctx, db, fortId, "UpdateFortBatch.pokestopSharedFields")
if gym != nil {
sharedFields := gym.GetSharedFields()
gymUnlock()

// Re-acquire pokestop lock to apply shared fields
pokestop, unlock, err = getPokestopRecordForUpdate(ctx, db, fortId)
pokestop, unlock, err = getPokestopRecordForUpdate(ctx, db, fortId, "UpdateFortBatch.sharedFields")
if err != nil {
log.Errorf("getPokestopRecordForUpdate (shared fields): %s", err)
} else if pokestop != nil {
Expand All @@ -61,7 +61,10 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa

if incidents != nil {
for _, incidentProto := range incidents {
incident, unlock, err := getOrCreateIncidentRecord(ctx, db, incidentProto.IncidentId, fortId)
if incidentProto.IncidentId == "" {
continue
}
incident, unlock, err := getOrCreateIncidentRecord(ctx, db, incidentProto.IncidentId, fortId, "UpdateFortBatch")
if err != nil {
log.Errorf("getOrCreateIncidentRecord: %s", err)
continue
Expand All @@ -74,7 +77,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa
}

if fort.Data.FortType == pogo.FortType_GYM && scanParameters.ProcessGyms {
gym, gymUnlock, err := getOrCreateGymRecord(ctx, db, fortId)
gym, gymUnlock, err := getOrCreateGymRecord(ctx, db, fortId, "UpdateFortBatch")
if err != nil {
log.Errorf("getOrCreateGymRecord: %s", err)
continue
Expand All @@ -90,13 +93,13 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa
// To avoid deadlock, we do this after releasing the gym lock.
if isNewRecord && DoesPokestopExist(ctx, db, fortId) {
// Get shared fields from pokestop (with pokestop lock only)
pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, fortId)
pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, fortId, "UpdateFortBatch.gymSharedFields")
if pokestop != nil {
sharedFields := pokestop.GetSharedFields()
unlock()

// Re-acquire gym lock to apply shared fields
gym, gymUnlock, err = getGymRecordForUpdate(ctx, db, fortId)
gym, gymUnlock, err = getGymRecordForUpdate(ctx, db, fortId, "UpdateFortBatch.sharedFields")
if err != nil {
log.Errorf("getGymRecordForUpdate (shared fields): %s", err)
} else if gym != nil {
Expand All @@ -113,7 +116,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa
func UpdateStationBatch(ctx context.Context, db db.DbDetails, scanParameters ScanParameters, p []RawStationData) {
for _, stationProto := range p {
stationId := stationProto.Data.Id
station, unlock, err := getOrCreateStationRecord(ctx, db, stationId)
station, unlock, err := getOrCreateStationRecord(ctx, db, stationId, "UpdateStationBatch")
if err != nil {
log.Errorf("getOrCreateStationRecord: %s", err)
continue
Expand All @@ -137,7 +140,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca
spawnpointUpdateFromWild(ctx, db, wild.Data, wild.Timestamp)

if scanParameters.ProcessWild {
pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId)
pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonBatch.wild")
if err != nil {
log.Errorf("getOrCreatePokemonRecord: %s", err)
continue
Expand All @@ -157,7 +160,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca
encounterId := nearby.Data.EncounterId

if nearby.Data.FortId != "" || scanParameters.ProcessNearbyCell {
pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId)
pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonBatch.nearby")
if err != nil {
log.Printf("getOrCreatePokemonRecord: %s", err)
continue
Expand All @@ -177,7 +180,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca
for _, mapPokemon := range mapPokemonList {
encounterId := mapPokemon.Data.EncounterId

pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId)
pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonBatch.map")
if err != nil {
log.Printf("getOrCreatePokemonRecord: %s", err)
continue
Expand All @@ -200,7 +203,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca
func UpdateClientWeatherBatch(ctx context.Context, db db.DbDetails, p []*pogo.ClientWeatherProto, timestampMs int64, account string) (updates []WeatherUpdate) {
hourKey := timestampMs / time.Hour.Milliseconds()
for _, weatherProto := range p {
weather, unlock, err := getOrCreateWeatherRecord(ctx, db, weatherProto.S2CellId)
weather, unlock, err := getOrCreateWeatherRecord(ctx, db, weatherProto.S2CellId, "UpdateClientWeatherBatch")
if err != nil {
log.Printf("getOrCreateWeatherRecord: %s", err)
continue
Expand Down
11 changes: 5 additions & 6 deletions decoder/gym.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package decoder

import (
"fmt"
"sync"

"github.com/guregu/null/v6"

Expand Down Expand Up @@ -58,7 +57,7 @@ type GymData struct {
// Gym struct.
// REMINDER! Keep hasChangesGym updated after making changes
type Gym struct {
mu sync.Mutex `db:"-"` // Object-level mutex
mu TrackedMutex[string] `db:"-"` // Object-level mutex with contention tracking

GymData // Embedded data fields (all db columns)

Expand Down Expand Up @@ -129,14 +128,14 @@ func (gym *Gym) snapshotOldValues() {
}
}

// Lock acquires the Gym's mutex
func (gym *Gym) Lock() {
gym.mu.Lock()
// Lock acquires the Gym's mutex with caller tracking
func (gym *Gym) Lock(caller string) {
gym.mu.Lock(caller, "Gym", gym.Id)
}

// Unlock releases the Gym's mutex
func (gym *Gym) Unlock() {
gym.mu.Unlock()
gym.mu.Unlock("Gym", gym.Id)
}

// --- Set methods with dirty tracking ---
Expand Down
10 changes: 5 additions & 5 deletions decoder/gym_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func UpdateGymRecordWithFortDetailsOutProto(ctx context.Context, db db.DbDetails, fort *pogo.FortDetailsOutProto) string {
gym, unlock, err := getOrCreateGymRecord(ctx, db, fort.Id)
gym, unlock, err := getOrCreateGymRecord(ctx, db, fort.Id, "UpdateGymFromFortDetails")
if err != nil {
return err.Error()
}
Expand All @@ -26,7 +26,7 @@ func UpdateGymRecordWithFortDetailsOutProto(ctx context.Context, db db.DbDetails
}

func UpdateGymRecordWithGymInfoProto(ctx context.Context, db db.DbDetails, gymInfo *pogo.GymGetInfoOutProto) string {
gym, unlock, err := getOrCreateGymRecord(ctx, db, gymInfo.GymStatusAndDefenders.PokemonFortProto.FortId)
gym, unlock, err := getOrCreateGymRecord(ctx, db, gymInfo.GymStatusAndDefenders.PokemonFortProto.FortId, "UpdateGymFromGymInfo")
if err != nil {
return err.Error()
}
Expand All @@ -40,7 +40,7 @@ func UpdateGymRecordWithGymInfoProto(ctx context.Context, db db.DbDetails, gymIn
}

func UpdateGymRecordWithGetMapFortsOutProto(ctx context.Context, db db.DbDetails, mapFort *pogo.GetMapFortsOutProto_FortProto) (bool, string) {
gym, unlock, err := getGymRecordForUpdate(ctx, db, mapFort.Id)
gym, unlock, err := getGymRecordForUpdate(ctx, db, mapFort.Id, "UpdateGymFromGetMapForts")
if err != nil {
return false, err.Error()
}
Expand All @@ -57,7 +57,7 @@ func UpdateGymRecordWithGetMapFortsOutProto(ctx context.Context, db db.DbDetails
}

func UpdateGymRecordWithRsvpProto(ctx context.Context, db db.DbDetails, req *pogo.RaidDetails, resp *pogo.GetEventRsvpsOutProto) string {
gym, unlock, err := getGymRecordForUpdate(ctx, db, req.FortId)
gym, unlock, err := getGymRecordForUpdate(ctx, db, req.FortId, "UpdateGymWithRsvp")
if err != nil {
return err.Error()
}
Expand All @@ -76,7 +76,7 @@ func UpdateGymRecordWithRsvpProto(ctx context.Context, db db.DbDetails, req *pog
}

func ClearGymRsvp(ctx context.Context, db db.DbDetails, fortId string) string {
gym, unlock, err := getGymRecordForUpdate(ctx, db, fortId)
gym, unlock, err := getGymRecordForUpdate(ctx, db, fortId, "ClearGymRsvp")
if err != nil {
return err.Error()
}
Expand Down
18 changes: 9 additions & 9 deletions decoder/gym_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func DoesGymExist(ctx context.Context, db db.DbDetails, fortId string) bool {

// PeekGymRecord - cache-only lookup, no DB fallback, returns locked.
// Caller MUST call returned unlock function if non-nil.
func PeekGymRecord(fortId string) (*Gym, func(), error) {
func PeekGymRecord(fortId string, caller string) (*Gym, func(), error) {
if item := gymCache.Get(fortId); item != nil {
gym := item.Value()
gym.Lock()
gym.Lock(caller)
return gym, func() { gym.Unlock() }, nil
}
return nil, nil, nil
Expand All @@ -65,11 +65,11 @@ func PeekGymRecord(fortId string) (*Gym, func(), error) {
// GetGymRecordReadOnly acquires lock but does NOT take snapshot.
// Use for read-only checks. Will cause a backing database lookup.
// Caller MUST call returned unlock function if non-nil.
func GetGymRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string) (*Gym, func(), error) {
func GetGymRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Gym, func(), error) {
// Check cache first
if item := gymCache.Get(fortId); item != nil {
gym := item.Value()
gym.Lock()
gym.Lock(caller)
return gym, func() { gym.Unlock() }, nil
}

Expand All @@ -94,15 +94,15 @@ func GetGymRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string) (
})

gym := existingGym.Value()
gym.Lock()
gym.Lock(caller)
return gym, func() { gym.Unlock() }, nil
}

// getGymRecordForUpdate acquires lock AND takes snapshot for webhook comparison.
// Use when modifying the Gym.
// Caller MUST call returned unlock function if non-nil.
func getGymRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string) (*Gym, func(), error) {
gym, unlock, err := GetGymRecordReadOnly(ctx, db, fortId)
func getGymRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Gym, func(), error) {
gym, unlock, err := GetGymRecordReadOnly(ctx, db, fortId, caller)
if err != nil || gym == nil {
return nil, nil, err
}
Expand All @@ -112,14 +112,14 @@ func getGymRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string)

// getOrCreateGymRecord gets existing or creates new, locked with snapshot.
// Caller MUST call returned unlock function.
func getOrCreateGymRecord(ctx context.Context, db db.DbDetails, fortId string) (*Gym, func(), error) {
func getOrCreateGymRecord(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Gym, func(), error) {
// Create new Gym atomically - function only called if key doesn't exist
gymItem, _ := gymCache.GetOrSetFunc(fortId, func() *Gym {
return &Gym{GymData: GymData{Id: fortId}, newRecord: true}
})

gym := gymItem.Value()
gym.Lock()
gym.Lock(caller)

if gym.newRecord {
// We should attempt to load from database
Expand Down
Loading
Loading