From b3a6fb306e0722e255f6b630467253bcc0ad5098 Mon Sep 17 00:00:00 2001 From: James Berry Date: Wed, 18 Mar 2026 01:31:35 +0000 Subject: [PATCH 1/4] feat: always-on TrackedMutex for lock contention instrumentation Replace sync.Mutex with TrackedMutex on all entity structs to detect deadlocks and lock contention at runtime. Fast path uses TryLock with ~25ns overhead. On contention, logs holder/waiter identity and wait duration. Warns if any lock is held longer than 5 seconds. All get*Record* functions now accept a caller string parameter that propagates through to Lock(), enabling precise identification of which code path holds or is waiting for a lock in contention logs. Co-Authored-By: Claude Opus 4.6 (1M context) --- decoder/api_fort.go | 12 ++++---- decoder/api_pokemon.go | 4 +-- decoder/api_pokemon_scan_v1.go | 2 +- decoder/api_pokemon_scan_v2.go | 4 +-- decoder/api_pokemon_scan_v3.go | 4 +-- decoder/fort_tracker.go | 4 +-- decoder/gmo_decode.go | 24 ++++++++-------- decoder/gym.go | 11 ++++---- decoder/gym_process.go | 10 +++---- decoder/gym_state.go | 18 ++++++------ decoder/incident.go | 11 ++++---- decoder/incident_process.go | 4 +-- decoder/incident_state.go | 22 +++++++-------- decoder/pokemon.go | 12 ++++---- decoder/pokemon_decode.go | 10 +++---- decoder/pokemon_process.go | 6 ++-- decoder/pokemon_state.go | 22 +++++++-------- decoder/pokestop.go | 11 ++++---- decoder/pokestop_process.go | 10 +++---- decoder/pokestop_state.go | 22 +++++++-------- decoder/routes.go | 11 ++++---- decoder/routes_process.go | 2 +- decoder/routes_state.go | 18 ++++++------ decoder/spawnpoint.go | 29 +++++++++---------- decoder/station.go | 11 ++++---- decoder/station_process.go | 4 +-- decoder/station_state.go | 18 ++++++------ decoder/tappable.go | 12 ++++---- decoder/tappable_decode.go | 2 +- decoder/tappable_process.go | 2 +- decoder/tappable_state.go | 14 +++++----- decoder/tracked_mutex.go | 51 ++++++++++++++++++++++++++++++++++ decoder/weather.go | 30 ++++++++++---------- decoder/weather_iv.go | 2 +- routes.go | 10 +++---- 35 files changed, 251 insertions(+), 188 deletions(-) create mode 100644 decoder/tracked_mutex.go diff --git a/decoder/api_fort.go b/decoder/api_fort.go index dedec767..c6cd33f0 100644 --- a/decoder/api_fort.go +++ b/decoder/api_fort.go @@ -281,7 +281,7 @@ func GymScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDetails) *Ap start := time.Now() for _, key := range returnKeys { - gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key) + gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanGym") if err == nil && gym != nil { gymCopy := buildGymResult(gym) results = append(results, &gymCopy) @@ -306,7 +306,7 @@ func PokestopScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDetails start := time.Now() for _, key := range returnKeys { - pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key) + pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanpokemon") if err == nil && pokestop != nil { pokestopCopy := buildPokestopResult(pokestop) results = append(results, &pokestopCopy) @@ -331,7 +331,7 @@ func StationScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDetails) start := time.Now() for _, key := range returnKeys { - station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key) + station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanStation") if err == nil && station != nil { stationCopy := buildStationResult(station) results = append(results, &stationCopy) @@ -356,7 +356,7 @@ func FortCombinedScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDet gyms := make([]*ApiGymResult, 0, len(gymKeys)) for _, key := range gymKeys { - gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key) + gym, unlock, err := GetGymRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanGymPokemon") if err == nil && gym != nil { gymCopy := buildGymResult(gym) gyms = append(gyms, &gymCopy) @@ -368,7 +368,7 @@ func FortCombinedScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDet pokestops := make([]*ApiPokestopResult, 0, len(pokestopKeys)) for _, key := range pokestopKeys { - pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key) + pokestop, unlock, err := getPokestopRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanpokemonPokemon") if err == nil && pokestop != nil { pokestopCopy := buildPokestopResult(pokestop) pokestops = append(pokestops, &pokestopCopy) @@ -380,7 +380,7 @@ func FortCombinedScanEndpoint(retrieveParameters ApiFortScan, dbDetails db.DbDet stations := make([]*ApiStationResult, 0, len(stationKeys)) for _, key := range stationKeys { - station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key) + station, unlock, err := getStationRecordReadOnly(context.Background(), dbDetails, key, "API.GetScanStationPokemon") if err == nil && station != nil { stationCopy := buildStationResult(station) stations = append(stations, &stationCopy) diff --git a/decoder/api_pokemon.go b/decoder/api_pokemon.go index d9843b83..2ff49ff7 100644 --- a/decoder/api_pokemon.go +++ b/decoder/api_pokemon.go @@ -132,7 +132,7 @@ func SearchPokemon(request ApiPokemonSearch) ([]*ApiPokemonResult, error) { apiResults := make([]*ApiPokemonResult, 0, len(results)) for _, encounterId := range results { - pokemon, unlock, _ := peekPokemonRecordReadOnly(encounterId) + pokemon, unlock, _ := peekPokemonRecordReadOnly(encounterId, "API.Pokemon") if pokemon != nil { apiPokemon := buildApiPokemonResult(pokemon) apiResults = append(apiResults, &apiPokemon) @@ -146,7 +146,7 @@ func SearchPokemon(request ApiPokemonSearch) ([]*ApiPokemonResult, error) { // Get one result func GetOnePokemon(pokemonId uint64) *ApiPokemonResult { - item, unlock, _ := peekPokemonRecordReadOnly(pokemonId) + item, unlock, _ := peekPokemonRecordReadOnly(pokemonId, "API.PokemonById") if item != nil { apiPokemon := buildApiPokemonResult(item) defer unlock() diff --git a/decoder/api_pokemon_scan_v1.go b/decoder/api_pokemon_scan_v1.go index a3de7d4b..1e4ef5f9 100644 --- a/decoder/api_pokemon_scan_v1.go +++ b/decoder/api_pokemon_scan_v1.go @@ -227,7 +227,7 @@ func GetPokemonInArea(retrieveParameters ApiPokemonScan) []*ApiPokemonResult { results := make([]*ApiPokemonResult, 0, len(returnKeys)) for _, key := range returnKeys { - pokemon, unlock, _ := peekPokemonRecordReadOnly(key) + pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v1") if pokemon != nil { apiPokemon := buildApiPokemonResult(pokemon) unlock() diff --git a/decoder/api_pokemon_scan_v2.go b/decoder/api_pokemon_scan_v2.go index 4603c036..bcdb4bbb 100644 --- a/decoder/api_pokemon_scan_v2.go +++ b/decoder/api_pokemon_scan_v2.go @@ -103,7 +103,7 @@ func GetPokemonInArea2(retrieveParameters ApiPokemonScan2) []*ApiPokemonResult { startUnix := start.Unix() for _, key := range returnKeys { - pokemon, unlock, _ := peekPokemonRecordReadOnly(key) + pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v2") if pokemon != nil { if pokemon.ExpireTimestamp.ValueOrZero() > startUnix { apiPokemon := buildApiPokemonResult(pokemon) @@ -183,7 +183,7 @@ func GrpcGetPokemonInArea2(retrieveParameters *pb.PokemonScanRequest) []*pb.Poke startUnix := start.Unix() for _, key := range returnKeys { - pokemon, unlock, _ := peekPokemonRecordReadOnly(key) + pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v2.pokemon") if pokemon != nil { if pokemon.ExpireTimestamp.ValueOrZero() > startUnix { apiPokemon := pb.PokemonDetails{ diff --git a/decoder/api_pokemon_scan_v3.go b/decoder/api_pokemon_scan_v3.go index a53b7aeb..bb4cb4ea 100644 --- a/decoder/api_pokemon_scan_v3.go +++ b/decoder/api_pokemon_scan_v3.go @@ -110,7 +110,7 @@ func GetPokemonInArea3(retrieveParameters ApiPokemonScan3) *PokemonScan3Result { startUnix := start.Unix() for _, key := range returnKeys { - pokemon, unlock, _ := peekPokemonRecordReadOnly(key) + pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v3") if pokemon != nil { if pokemon.ExpireTimestamp.ValueOrZero() > startUnix { apiPokemon := buildApiPokemonResult(pokemon) @@ -200,7 +200,7 @@ func GrpcGetPokemonInArea3(retrieveParameters *pb.PokemonScanRequestV3) ([]*pb.P startUnix := start.Unix() for _, key := range returnKeys { - pokemon, unlock, _ := peekPokemonRecordReadOnly(key) + pokemon, unlock, _ := peekPokemonRecordReadOnly(key, "API.ScanPokemon.v3.pokemon") if pokemon != nil { if pokemon.ExpireTimestamp.ValueOrZero() > startUnix { apiPokemon := pb.PokemonDetails{ diff --git a/decoder/fort_tracker.go b/decoder/fort_tracker.go index 338253ab..3ae25745 100644 --- a/decoder/fort_tracker.go +++ b/decoder/fort_tracker.go @@ -515,7 +515,7 @@ func (ft *FortTracker) GetFortInfo(fortId string) *FortTrackerInfo { // clearGymWithLock marks a gym as deleted while holding the object-level mutex func clearGymWithLock(ctx context.Context, dbDetails db.DbDetails, gymId string, cellId uint64, removeFromTracker bool) { // Load gym through cache (will load from DB if not cached) - gym, unlock, err := getGymRecordForUpdate(ctx, dbDetails, gymId) + gym, unlock, err := getGymRecordForUpdate(ctx, dbDetails, gymId, "clearGymWithLock") if err != nil { log.Errorf("FortTracker: failed to load gym %s - %s", gymId, err) return @@ -550,7 +550,7 @@ func clearGymWithLock(ctx context.Context, dbDetails db.DbDetails, gymId string, // clearPokestopWithLock marks a pokestop as deleted while holding the object-level mutex func clearPokestopWithLock(ctx context.Context, dbDetails db.DbDetails, stopId string, cellId uint64, removeFromTracker bool) { // Load pokestop through cache (will load from DB if not cached) - pokestop, unlock, err := getPokestopRecordForUpdate(ctx, dbDetails, stopId) + pokestop, unlock, err := getPokestopRecordForUpdate(ctx, dbDetails, stopId, "clearPokestopWithLock") if err != nil { log.Errorf("FortTracker: failed to load pokestop %s - %s", stopId, err) return diff --git a/decoder/gmo_decode.go b/decoder/gmo_decode.go index 2bcdf1f9..f2fa8b09 100644 --- a/decoder/gmo_decode.go +++ b/decoder/gmo_decode.go @@ -21,7 +21,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa for _, fort := range p { fortId := fort.Data.FortId if fort.Data.FortType == pogo.FortType_CHECKPOINT && scanParameters.ProcessPokestops { - pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, fortId) + pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, fortId, "UpdateFortBatch") if err != nil { log.Errorf("getOrCreatePokestopRecord: %s", err) continue @@ -37,13 +37,13 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa // To avoid deadlock, we do this after releasing the pokestop lock. if isNewRecord && DoesGymExist(ctx, db, fortId) { // Get shared fields from gym (with gym lock only) - gym, gymUnlock, _ := GetGymRecordReadOnly(ctx, db, fortId) + gym, gymUnlock, _ := GetGymRecordReadOnly(ctx, db, fortId, "UpdateFortBatch.pokestopSharedFields") if gym != nil { sharedFields := gym.GetSharedFields() gymUnlock() // Re-acquire pokestop lock to apply shared fields - pokestop, unlock, err = getPokestopRecordForUpdate(ctx, db, fortId) + pokestop, unlock, err = getPokestopRecordForUpdate(ctx, db, fortId, "UpdateFortBatch.sharedFields") if err != nil { log.Errorf("getPokestopRecordForUpdate (shared fields): %s", err) } else if pokestop != nil { @@ -61,7 +61,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa if incidents != nil { for _, incidentProto := range incidents { - incident, unlock, err := getOrCreateIncidentRecord(ctx, db, incidentProto.IncidentId, fortId) + incident, unlock, err := getOrCreateIncidentRecord(ctx, db, incidentProto.IncidentId, fortId, "UpdateFortBatch") if err != nil { log.Errorf("getOrCreateIncidentRecord: %s", err) continue @@ -74,7 +74,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa } if fort.Data.FortType == pogo.FortType_GYM && scanParameters.ProcessGyms { - gym, gymUnlock, err := getOrCreateGymRecord(ctx, db, fortId) + gym, gymUnlock, err := getOrCreateGymRecord(ctx, db, fortId, "UpdateFortBatch") if err != nil { log.Errorf("getOrCreateGymRecord: %s", err) continue @@ -90,13 +90,13 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa // To avoid deadlock, we do this after releasing the gym lock. if isNewRecord && DoesPokestopExist(ctx, db, fortId) { // Get shared fields from pokestop (with pokestop lock only) - pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, fortId) + pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, fortId, "UpdateFortBatch.gymSharedFields") if pokestop != nil { sharedFields := pokestop.GetSharedFields() unlock() // Re-acquire gym lock to apply shared fields - gym, gymUnlock, err = getGymRecordForUpdate(ctx, db, fortId) + gym, gymUnlock, err = getGymRecordForUpdate(ctx, db, fortId, "UpdateFortBatch.sharedFields") if err != nil { log.Errorf("getGymRecordForUpdate (shared fields): %s", err) } else if gym != nil { @@ -113,7 +113,7 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa func UpdateStationBatch(ctx context.Context, db db.DbDetails, scanParameters ScanParameters, p []RawStationData) { for _, stationProto := range p { stationId := stationProto.Data.Id - station, unlock, err := getOrCreateStationRecord(ctx, db, stationId) + station, unlock, err := getOrCreateStationRecord(ctx, db, stationId, "UpdateStationBatch") if err != nil { log.Errorf("getOrCreateStationRecord: %s", err) continue @@ -137,7 +137,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca spawnpointUpdateFromWild(ctx, db, wild.Data, wild.Timestamp) if scanParameters.ProcessWild { - pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId) + pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonBatch.wild") if err != nil { log.Errorf("getOrCreatePokemonRecord: %s", err) continue @@ -157,7 +157,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca encounterId := nearby.Data.EncounterId if nearby.Data.FortId != "" || scanParameters.ProcessNearbyCell { - pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId) + pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonBatch.nearby") if err != nil { log.Printf("getOrCreatePokemonRecord: %s", err) continue @@ -177,7 +177,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca for _, mapPokemon := range mapPokemonList { encounterId := mapPokemon.Data.EncounterId - pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId) + pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonBatch.map") if err != nil { log.Printf("getOrCreatePokemonRecord: %s", err) continue @@ -200,7 +200,7 @@ func UpdatePokemonBatch(ctx context.Context, db db.DbDetails, scanParameters Sca func UpdateClientWeatherBatch(ctx context.Context, db db.DbDetails, p []*pogo.ClientWeatherProto, timestampMs int64, account string) (updates []WeatherUpdate) { hourKey := timestampMs / time.Hour.Milliseconds() for _, weatherProto := range p { - weather, unlock, err := getOrCreateWeatherRecord(ctx, db, weatherProto.S2CellId) + weather, unlock, err := getOrCreateWeatherRecord(ctx, db, weatherProto.S2CellId, "UpdateClientWeatherBatch") if err != nil { log.Printf("getOrCreateWeatherRecord: %s", err) continue diff --git a/decoder/gym.go b/decoder/gym.go index c072f87f..e15822b4 100644 --- a/decoder/gym.go +++ b/decoder/gym.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "sync" "github.com/guregu/null/v6" @@ -58,7 +57,7 @@ type GymData struct { // Gym struct. // REMINDER! Keep hasChangesGym updated after making changes type Gym struct { - mu sync.Mutex `db:"-"` // Object-level mutex + mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking GymData // Embedded data fields (all db columns) @@ -129,9 +128,11 @@ func (gym *Gym) snapshotOldValues() { } } -// Lock acquires the Gym's mutex -func (gym *Gym) Lock() { - gym.mu.Lock() +// Lock acquires the Gym's mutex with caller tracking +func (gym *Gym) Lock(caller string) { + gym.mu.entityType = "Gym" + gym.mu.entityId = gym.Id + gym.mu.Lock(caller) } // Unlock releases the Gym's mutex diff --git a/decoder/gym_process.go b/decoder/gym_process.go index 1c34104c..db382810 100644 --- a/decoder/gym_process.go +++ b/decoder/gym_process.go @@ -11,7 +11,7 @@ import ( ) func UpdateGymRecordWithFortDetailsOutProto(ctx context.Context, db db.DbDetails, fort *pogo.FortDetailsOutProto) string { - gym, unlock, err := getOrCreateGymRecord(ctx, db, fort.Id) + gym, unlock, err := getOrCreateGymRecord(ctx, db, fort.Id, "UpdateGymFromFortDetails") if err != nil { return err.Error() } @@ -26,7 +26,7 @@ func UpdateGymRecordWithFortDetailsOutProto(ctx context.Context, db db.DbDetails } func UpdateGymRecordWithGymInfoProto(ctx context.Context, db db.DbDetails, gymInfo *pogo.GymGetInfoOutProto) string { - gym, unlock, err := getOrCreateGymRecord(ctx, db, gymInfo.GymStatusAndDefenders.PokemonFortProto.FortId) + gym, unlock, err := getOrCreateGymRecord(ctx, db, gymInfo.GymStatusAndDefenders.PokemonFortProto.FortId, "UpdateGymFromGymInfo") if err != nil { return err.Error() } @@ -40,7 +40,7 @@ func UpdateGymRecordWithGymInfoProto(ctx context.Context, db db.DbDetails, gymIn } func UpdateGymRecordWithGetMapFortsOutProto(ctx context.Context, db db.DbDetails, mapFort *pogo.GetMapFortsOutProto_FortProto) (bool, string) { - gym, unlock, err := getGymRecordForUpdate(ctx, db, mapFort.Id) + gym, unlock, err := getGymRecordForUpdate(ctx, db, mapFort.Id, "UpdateGymFromGetMapForts") if err != nil { return false, err.Error() } @@ -57,7 +57,7 @@ func UpdateGymRecordWithGetMapFortsOutProto(ctx context.Context, db db.DbDetails } func UpdateGymRecordWithRsvpProto(ctx context.Context, db db.DbDetails, req *pogo.RaidDetails, resp *pogo.GetEventRsvpsOutProto) string { - gym, unlock, err := getGymRecordForUpdate(ctx, db, req.FortId) + gym, unlock, err := getGymRecordForUpdate(ctx, db, req.FortId, "UpdateGymWithRsvp") if err != nil { return err.Error() } @@ -76,7 +76,7 @@ func UpdateGymRecordWithRsvpProto(ctx context.Context, db db.DbDetails, req *pog } func ClearGymRsvp(ctx context.Context, db db.DbDetails, fortId string) string { - gym, unlock, err := getGymRecordForUpdate(ctx, db, fortId) + gym, unlock, err := getGymRecordForUpdate(ctx, db, fortId, "ClearGymRsvp") if err != nil { return err.Error() } diff --git a/decoder/gym_state.go b/decoder/gym_state.go index ff395751..cee1f1b9 100644 --- a/decoder/gym_state.go +++ b/decoder/gym_state.go @@ -53,10 +53,10 @@ func DoesGymExist(ctx context.Context, db db.DbDetails, fortId string) bool { // PeekGymRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func PeekGymRecord(fortId string) (*Gym, func(), error) { +func PeekGymRecord(fortId string, caller string) (*Gym, func(), error) { if item := gymCache.Get(fortId); item != nil { gym := item.Value() - gym.Lock() + gym.Lock(caller) return gym, func() { gym.Unlock() }, nil } return nil, nil, nil @@ -65,11 +65,11 @@ func PeekGymRecord(fortId string) (*Gym, func(), error) { // GetGymRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func GetGymRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string) (*Gym, func(), error) { +func GetGymRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Gym, func(), error) { // Check cache first if item := gymCache.Get(fortId); item != nil { gym := item.Value() - gym.Lock() + gym.Lock(caller) return gym, func() { gym.Unlock() }, nil } @@ -94,15 +94,15 @@ func GetGymRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string) ( }) gym := existingGym.Value() - gym.Lock() + gym.Lock(caller) return gym, func() { gym.Unlock() }, nil } // getGymRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Use when modifying the Gym. // Caller MUST call returned unlock function if non-nil. -func getGymRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string) (*Gym, func(), error) { - gym, unlock, err := GetGymRecordReadOnly(ctx, db, fortId) +func getGymRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Gym, func(), error) { + gym, unlock, err := GetGymRecordReadOnly(ctx, db, fortId, caller) if err != nil || gym == nil { return nil, nil, err } @@ -112,14 +112,14 @@ func getGymRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string) // getOrCreateGymRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreateGymRecord(ctx context.Context, db db.DbDetails, fortId string) (*Gym, func(), error) { +func getOrCreateGymRecord(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Gym, func(), error) { // Create new Gym atomically - function only called if key doesn't exist gymItem, _ := gymCache.GetOrSetFunc(fortId, func() *Gym { return &Gym{GymData: GymData{Id: fortId}, newRecord: true} }) gym := gymItem.Value() - gym.Lock() + gym.Lock(caller) if gym.newRecord { // We should attempt to load from database diff --git a/decoder/incident.go b/decoder/incident.go index a0cbd035..00ff5bb5 100644 --- a/decoder/incident.go +++ b/decoder/incident.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "sync" "github.com/guregu/null/v6" ) @@ -30,7 +29,7 @@ type IncidentData struct { // Incident struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Incident struct { - mu sync.Mutex `db:"-"` // Object-level mutex + mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking IncidentData // Embedded data fields - can be copied for write-behind queue @@ -100,9 +99,11 @@ func (incident *Incident) IsNewRecord() bool { return incident.newRecord } -// Lock acquires the Incident's mutex -func (incident *Incident) Lock() { - incident.mu.Lock() +// Lock acquires the Incident's mutex with caller tracking +func (incident *Incident) Lock(caller string) { + incident.mu.entityType = "Incident" + incident.mu.entityId = incident.Id + incident.mu.Lock(caller) } // Unlock releases the Incident's mutex diff --git a/decoder/incident_process.go b/decoder/incident_process.go index 4550c7c9..cacce899 100644 --- a/decoder/incident_process.go +++ b/decoder/incident_process.go @@ -11,7 +11,7 @@ import ( ) func UpdateIncidentLineup(ctx context.Context, db db.DbDetails, protoReq *pogo.OpenInvasionCombatSessionProto, protoRes *pogo.OpenInvasionCombatSessionOutProto) string { - incident, unlock, err := getOrCreateIncidentRecord(ctx, db, protoReq.IncidentLookup.IncidentId, protoReq.IncidentLookup.FortId) + incident, unlock, err := getOrCreateIncidentRecord(ctx, db, protoReq.IncidentLookup.IncidentId, protoReq.IncidentLookup.FortId, "UpdateIncidentWithConfirmation") if err != nil { return fmt.Sprintf("getOrCreateIncidentRecord: %s", err) } @@ -27,7 +27,7 @@ func UpdateIncidentLineup(ctx context.Context, db db.DbDetails, protoReq *pogo.O } func ConfirmIncident(ctx context.Context, db db.DbDetails, proto *pogo.StartIncidentOutProto) string { - incident, unlock, err := getOrCreateIncidentRecord(ctx, db, proto.Incident.IncidentId, proto.Incident.FortId) + incident, unlock, err := getOrCreateIncidentRecord(ctx, db, proto.Incident.IncidentId, proto.Incident.FortId, "UpdateIncidentFromInvasion") if err != nil { return fmt.Sprintf("getOrCreateIncidentRecord: %s", err) } diff --git a/decoder/incident_state.go b/decoder/incident_state.go index a4fd5916..938c4370 100644 --- a/decoder/incident_state.go +++ b/decoder/incident_state.go @@ -27,10 +27,10 @@ func loadIncidentFromDatabase(ctx context.Context, db db.DbDetails, incidentId s // peekIncidentRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func peekIncidentRecord(incidentId string) (*Incident, func(), error) { +func peekIncidentRecord(incidentId string, caller string) (*Incident, func(), error) { if item := incidentCache.Get(incidentId); item != nil { incident := item.Value() - incident.Lock() + incident.Lock(caller) return incident, func() { incident.Unlock() }, nil } return nil, nil, nil @@ -39,11 +39,11 @@ func peekIncidentRecord(incidentId string) (*Incident, func(), error) { // getIncidentRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getIncidentRecordReadOnly(ctx context.Context, db db.DbDetails, incidentId string) (*Incident, func(), error) { +func getIncidentRecordReadOnly(ctx context.Context, db db.DbDetails, incidentId string, caller string) (*Incident, func(), error) { // Check cache first if item := incidentCache.Get(incidentId); item != nil { incident := item.Value() - incident.Lock() + incident.Lock(caller) return incident, func() { incident.Unlock() }, nil } @@ -67,14 +67,14 @@ func getIncidentRecordReadOnly(ctx context.Context, db db.DbDetails, incidentId }) incident := existingIncident.Value() - incident.Lock() + incident.Lock(caller) return incident, func() { incident.Unlock() }, nil } // getIncidentRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Caller MUST call returned unlock function if non-nil. -func getIncidentRecordForUpdate(ctx context.Context, db db.DbDetails, incidentId string) (*Incident, func(), error) { - incident, unlock, err := getIncidentRecordReadOnly(ctx, db, incidentId) +func getIncidentRecordForUpdate(ctx context.Context, db db.DbDetails, incidentId string, caller string) (*Incident, func(), error) { + incident, unlock, err := getIncidentRecordReadOnly(ctx, db, incidentId, caller) if err != nil || incident == nil { return nil, nil, err } @@ -84,14 +84,14 @@ func getIncidentRecordForUpdate(ctx context.Context, db db.DbDetails, incidentId // getOrCreateIncidentRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreateIncidentRecord(ctx context.Context, db db.DbDetails, incidentId string, pokestopId string) (*Incident, func(), error) { +func getOrCreateIncidentRecord(ctx context.Context, db db.DbDetails, incidentId string, pokestopId string, caller string) (*Incident, func(), error) { // Create new Incident atomically - function only called if key doesn't exist incidentItem, _ := incidentCache.GetOrSetFunc(incidentId, func() *Incident { return &Incident{IncidentData: IncidentData{Id: incidentId, PokestopId: pokestopId}, newRecord: true} }) incident := incidentItem.Value() - incident.Lock() + incident.Lock(caller) if incident.newRecord { // We should attempt to load from database @@ -147,7 +147,7 @@ func saveIncidentRecord(ctx context.Context, db db.DbDetails, incident *Incident var stopLat, stopLon float64 var stopCellId uint64 - stop, unlock, _ := getPokestopRecordReadOnly(ctx, db, incident.PokestopId) + stop, unlock, _ := getPokestopRecordReadOnly(ctx, db, incident.PokestopId, "saveIncidentRecord") if stop != nil { stopLat, stopLon = stop.Lat, stop.Lon stopCellId = uint64(stop.CellId.ValueOrZero()) @@ -220,7 +220,7 @@ func createIncidentWebhooks(ctx context.Context, db db.DbDetails, incident *Inci var stopLat, stopLon float64 var stopEnabled bool var stopCellId uint64 - stop, unlock, _ := getPokestopRecordReadOnly(ctx, db, incident.PokestopId) + stop, unlock, _ := getPokestopRecordReadOnly(ctx, db, incident.PokestopId, "createIncidentWebhooks") if stop != nil { pokestopName = stop.Name.ValueOrZero() stopLat, stopLon = stop.Lat, stop.Lon diff --git a/decoder/pokemon.go b/decoder/pokemon.go index 98551382..e3ddb2f4 100644 --- a/decoder/pokemon.go +++ b/decoder/pokemon.go @@ -2,7 +2,7 @@ package decoder import ( "fmt" - "sync" + "strconv" "golbat/grpc" @@ -63,7 +63,7 @@ type PokemonData struct { // // FirstSeenTimestamp: This field is used in IsNewRecord. It should only be set in savePokemonRecord. type Pokemon struct { - mu sync.Mutex `db:"-"` // Object-level mutex + mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking PokemonData // Embedded data fields - can be copied for write-behind queue @@ -162,9 +162,11 @@ func (pokemon *Pokemon) snapshotOldValues() { } } -// Lock acquires the Pokemon's mutex -func (pokemon *Pokemon) Lock() { - pokemon.mu.Lock() +// Lock acquires the Pokemon's mutex with caller tracking +func (pokemon *Pokemon) Lock(caller string) { + pokemon.mu.entityType = "Pokemon" + pokemon.mu.entityId = strconv.FormatUint(uint64(pokemon.Id), 10) + pokemon.mu.Lock(caller) } // Unlock releases the Pokemon's mutex diff --git a/decoder/pokemon_decode.go b/decoder/pokemon_decode.go index 5da26d97..5c20da1b 100644 --- a/decoder/pokemon_decode.go +++ b/decoder/pokemon_decode.go @@ -164,7 +164,7 @@ func (pokemon *Pokemon) updateFromMap(ctx context.Context, db db.DbDetails, mapP spawnpointId := mapPokemon.SpawnpointId - pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, spawnpointId) + pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, spawnpointId, "updateFromMap") if pokestop == nil { // Unrecognised pokestop return @@ -228,7 +228,7 @@ func (pokemon *Pokemon) updateFromNearby(ctx context.Context, db db.DbDetails, n default: return } - pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, pokestopId) + pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, pokestopId, "updateFromNearby") if pokestop == nil { // Unrecognised pokestop, rollback changes overrideLatLon = pokemon.isNewRecord() @@ -294,7 +294,7 @@ func (pokemon *Pokemon) setExpireTimestampFromSpawnpoint(ctx context.Context, db } pokemon.ExpireTimestampVerified = false - spawnPoint, unlock, _ := getSpawnpointRecord(ctx, db, spawnId) + spawnPoint, unlock, _ := getSpawnpointRecord(ctx, db, spawnId, "setExpireTimestampFromSpawnpoint") if spawnPoint != nil && spawnPoint.DespawnSec.Valid { despawnSecond := int(spawnPoint.DespawnSec.ValueOrZero()) unlock() @@ -696,7 +696,7 @@ func (pokemon *Pokemon) addEncounterPokemon(ctx context.Context, db db.DbDetails Form: int32(proto.PokemonDisplay.Form), } if scan.CellWeather == int32(pogo.GameplayWeatherProto_NONE) { - weather, unlock, err := peekWeatherRecord(weatherCellIdFromLatLon(pokemon.Lat, pokemon.Lon)) + weather, unlock, err := peekWeatherRecord(weatherCellIdFromLatLon(pokemon.Lat, pokemon.Lon), "addEncounterPokemon") if weather == nil || !weather.GameplayCondition.Valid { log.Warnf("Failed to obtain weather for Pokemon %d: %s", pokemon.Id, err) } else { @@ -935,7 +935,7 @@ func (pokemon *Pokemon) recomputeCpIfNeeded(ctx context.Context, db db.DbDetails cellId := weatherCellIdFromLatLon(pokemon.Lat, pokemon.Lon) cellWeather, found := weather[cellId] if !found { - record, unlock, err := getWeatherRecordReadOnly(ctx, db, cellId) + record, unlock, err := getWeatherRecordReadOnly(ctx, db, cellId, "recomputeCpIfNeeded") if err != nil || record == nil || !record.GameplayCondition.Valid { log.Warnf("[POKEMON] Failed to obtain weather for Pokemon %d: %s", pokemon.Id, err) } else { diff --git a/decoder/pokemon_process.go b/decoder/pokemon_process.go index ee6f460c..ffdc09a0 100644 --- a/decoder/pokemon_process.go +++ b/decoder/pokemon_process.go @@ -19,7 +19,7 @@ func UpdatePokemonRecordWithEncounterProto(ctx context.Context, db db.DbDetails, encounterId := encounter.Pokemon.EncounterId - pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId) + pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonFromEncounter") if err != nil { log.Errorf("Error pokemon [%d]: %s", encounterId, err) return fmt.Sprintf("Error finding pokemon %s", err) @@ -42,7 +42,7 @@ func UpdatePokemonRecordWithDiskEncounterProto(ctx context.Context, db db.DbDeta encounterId := uint64(encounter.Pokemon.PokemonDisplay.DisplayId) - pokemon, unlock, err := getPokemonRecordForUpdate(ctx, db, encounterId) + pokemon, unlock, err := getPokemonRecordForUpdate(ctx, db, encounterId, "UpdatePokemonFromDiskEncounter") if err != nil { log.Errorf("Error pokemon [%d]: %s", encounterId, err) return fmt.Sprintf("Error finding pokemon %s", err) @@ -70,7 +70,7 @@ func UpdatePokemonRecordWithDiskEncounterProto(ctx context.Context, db db.DbDeta func UpdatePokemonRecordWithTappableEncounter(ctx context.Context, db db.DbDetails, request *pogo.ProcessTappableProto, encounter *pogo.TappableEncounterProto, username string, timestampMs int64) string { encounterId := request.GetEncounterId() - pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId) + pokemon, unlock, err := getOrCreatePokemonRecord(ctx, db, encounterId, "UpdatePokemonFromTappableEncounter") if err != nil { log.Errorf("Error pokemon [%d]: %s", encounterId, err) return fmt.Sprintf("Error finding pokemon %s", err) diff --git a/decoder/pokemon_state.go b/decoder/pokemon_state.go index b054e1d3..9e620b44 100644 --- a/decoder/pokemon_state.go +++ b/decoder/pokemon_state.go @@ -33,10 +33,10 @@ const pokemonSelectColumns = `id, pokemon_id, lat, lon, spawn_id, expire_timesta // peekPokemonRecordReadOnly acquires lock, does NOT take snapshot. // Use for read-only checks which will not cause a backing database lookup // Caller must use returned unlock function -func peekPokemonRecordReadOnly(encounterId uint64) (*Pokemon, func(), error) { +func peekPokemonRecordReadOnly(encounterId uint64, caller string) (*Pokemon, func(), error) { if item := pokemonCache.Get(encounterId); item != nil { pokemon := item.Value() - pokemon.Lock() + pokemon.Lock(caller) return pokemon, func() { pokemon.Unlock() }, nil } @@ -55,16 +55,16 @@ func loadPokemonFromDatabase(ctx context.Context, db db.DbDetails, encounterId u // getPokemonRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks, but will cause a backing database lookup // Caller MUST call returned unlock function. -func getPokemonRecordReadOnly(ctx context.Context, db db.DbDetails, encounterId uint64) (*Pokemon, func(), error) { +func getPokemonRecordReadOnly(ctx context.Context, db db.DbDetails, encounterId uint64, caller string) (*Pokemon, func(), error) { // If we are in-memory only, this is identical to peek if config.Config.PokemonMemoryOnly { - return peekPokemonRecordReadOnly(encounterId) + return peekPokemonRecordReadOnly(encounterId, caller) } // Check cache first if item := pokemonCache.Get(encounterId); item != nil { pokemon := item.Value() - pokemon.Lock() + pokemon.Lock(caller) return pokemon, func() { pokemon.Unlock() }, nil } @@ -87,15 +87,15 @@ func getPokemonRecordReadOnly(ctx context.Context, db db.DbDetails, encounterId }) pokemon := existingPokemon.Value() - pokemon.Lock() + pokemon.Lock(caller) return pokemon, func() { pokemon.Unlock() }, nil } // getPokemonRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Use when modifying the Pokemon. // Caller MUST call returned unlock function. -func getPokemonRecordForUpdate(ctx context.Context, db db.DbDetails, encounterId uint64) (*Pokemon, func(), error) { - pokemon, unlock, err := getPokemonRecordReadOnly(ctx, db, encounterId) +func getPokemonRecordForUpdate(ctx context.Context, db db.DbDetails, encounterId uint64, caller string) (*Pokemon, func(), error) { + pokemon, unlock, err := getPokemonRecordReadOnly(ctx, db, encounterId, caller) if err != nil || pokemon == nil { return nil, nil, err } @@ -105,14 +105,14 @@ func getPokemonRecordForUpdate(ctx context.Context, db db.DbDetails, encounterId // getOrCreatePokemonRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreatePokemonRecord(ctx context.Context, db db.DbDetails, encounterId uint64) (*Pokemon, func(), error) { +func getOrCreatePokemonRecord(ctx context.Context, db db.DbDetails, encounterId uint64, caller string) (*Pokemon, func(), error) { // Create new Pokemon atomically - function only called if key doesn't exist pokemonItem, _ := pokemonCache.GetOrSetFunc(encounterId, func() *Pokemon { return &Pokemon{PokemonData: PokemonData{Id: Uint64Str(encounterId)}, newRecord: true} }) pokemon := pokemonItem.Value() - pokemon.Lock() + pokemon.Lock(caller) if config.Config.PokemonMemoryOnly { pokemon.snapshotOldValues() @@ -456,7 +456,7 @@ func createPokemonWebhooks(ctx context.Context, db db.DbDetails, pokemon *Pokemo var pokestopName *string if pokemon.PokestopId.Valid { - pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, pokemon.PokestopId.String) + pokestop, unlock, _ := getPokestopRecordReadOnly(ctx, db, pokemon.PokestopId.String, "createPokemonWebhooks") name := "Unknown" if pokestop != nil { name = pokestop.Name.ValueOrZero() diff --git a/decoder/pokestop.go b/decoder/pokestop.go index 74ee6ecb..9135a89f 100644 --- a/decoder/pokestop.go +++ b/decoder/pokestop.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "sync" "github.com/guregu/null/v6" @@ -69,7 +68,7 @@ type PokestopData struct { // Pokestop struct. type Pokestop struct { - mu sync.Mutex `db:"-"` // Object-level mutex + mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking PokestopData // Embedded data fields - can be copied for write-behind queue @@ -175,9 +174,11 @@ func (p *Pokestop) snapshotOldValues() { } } -// Lock acquires the Pokestop's mutex -func (p *Pokestop) Lock() { - p.mu.Lock() +// Lock acquires the Pokestop's mutex with caller tracking +func (p *Pokestop) Lock(caller string) { + p.mu.entityType = "Pokestop" + p.mu.entityId = p.Id + p.mu.Lock(caller) } // Unlock releases the Pokestop's mutex diff --git a/decoder/pokestop_process.go b/decoder/pokestop_process.go index ca196eeb..bd7ac4c8 100644 --- a/decoder/pokestop_process.go +++ b/decoder/pokestop_process.go @@ -14,7 +14,7 @@ import ( ) func UpdatePokestopRecordWithFortDetailsOutProto(ctx context.Context, db db.DbDetails, fort *pogo.FortDetailsOutProto) string { - pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, fort.Id) + pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, fort.Id, "UpdatePokestopFromFortDetails") if err != nil { log.Printf("Update pokestop %s", err) return fmt.Sprintf("Error %s", err) @@ -41,7 +41,7 @@ func UpdatePokestopWithQuest(ctx context.Context, db db.DbDetails, quest *pogo.F statsCollector.IncDecodeQuest("ok", haveArStr) - pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, quest.FortId) + pokestop, unlock, err := getOrCreatePokestopRecord(ctx, db, quest.FortId, "UpdatePokestopWithQuest") if err != nil { log.Printf("Update quest %s", err) return fmt.Sprintf("error %s", err) @@ -79,7 +79,7 @@ func GetQuestStatusWithGeofence(dbDetails db.DbDetails, geofence *geojson.Featur } func UpdatePokestopRecordWithGetMapFortsOutProto(ctx context.Context, db db.DbDetails, mapFort *pogo.GetMapFortsOutProto_FortProto) (bool, string) { - pokestop, unlock, err := getPokestopRecordForUpdate(ctx, db, mapFort.Id) + pokestop, unlock, err := getPokestopRecordForUpdate(ctx, db, mapFort.Id, "UpdatePokestopFromGetMapForts") if err != nil { log.Printf("Update pokestop %s", err) return false, fmt.Sprintf("Error %s", err) @@ -122,7 +122,7 @@ func UpdatePokestopWithContestData(ctx context.Context, db db.DbDetails, request contest := contestData.ContestIncident.Contests[0] - pokestop, unlock, err := getPokestopRecordForUpdate(ctx, db, fortId) + pokestop, unlock, err := getPokestopRecordForUpdate(ctx, db, fortId, "UpdatePokestopWithContestData") if err != nil { log.Printf("Get pokestop %s", err) return "Error getting pokestop" @@ -147,7 +147,7 @@ func getFortIdFromContest(id string) string { func UpdatePokestopWithPokemonSizeContestEntry(ctx context.Context, db db.DbDetails, request *pogo.GetPokemonSizeLeaderboardEntryProto, contestData *pogo.GetPokemonSizeLeaderboardEntryOutProto) string { fortId := getFortIdFromContest(request.GetContestId()) - pokestop, unlock, err := getPokestopRecordForUpdate(ctx, db, fortId) + pokestop, unlock, err := getPokestopRecordForUpdate(ctx, db, fortId, "UpdatePokestopWithContestEntry") if err != nil { log.Printf("Get pokestop %s", err) return "Error getting pokestop" diff --git a/decoder/pokestop_state.go b/decoder/pokestop_state.go index 1ed79c9c..419e39f3 100644 --- a/decoder/pokestop_state.go +++ b/decoder/pokestop_state.go @@ -43,10 +43,10 @@ func loadPokestopFromDatabase(ctx context.Context, db db.DbDetails, fortId strin // PeekPokestopRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func PeekPokestopRecord(fortId string) (*Pokestop, func(), error) { +func PeekPokestopRecord(fortId string, caller string) (*Pokestop, func(), error) { if item := pokestopCache.Get(fortId); item != nil { pokestop := item.Value() - pokestop.Lock() + pokestop.Lock(caller) return pokestop, func() { pokestop.Unlock() }, nil } return nil, nil, nil @@ -72,11 +72,11 @@ func DoesPokestopExist(ctx context.Context, db db.DbDetails, fortId string) bool // getPokestopRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getPokestopRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string) (*Pokestop, func(), error) { +func getPokestopRecordReadOnly(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Pokestop, func(), error) { // Check cache first if item := pokestopCache.Get(fortId); item != nil { pokestop := item.Value() - pokestop.Lock() + pokestop.Lock(caller) return pokestop, func() { pokestop.Unlock() }, nil } @@ -102,15 +102,15 @@ func getPokestopRecordReadOnly(ctx context.Context, db db.DbDetails, fortId stri }) pokestop := existingPokestop.Value() - pokestop.Lock() + pokestop.Lock(caller) return pokestop, func() { pokestop.Unlock() }, nil } // getPokestopRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Use when modifying the Pokestop. // Caller MUST call returned unlock function if non-nil. -func getPokestopRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string) (*Pokestop, func(), error) { - pokestop, unlock, err := getPokestopRecordReadOnly(ctx, db, fortId) +func getPokestopRecordForUpdate(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Pokestop, func(), error) { + pokestop, unlock, err := getPokestopRecordReadOnly(ctx, db, fortId, caller) if err != nil || pokestop == nil { return nil, nil, err } @@ -120,14 +120,14 @@ func getPokestopRecordForUpdate(ctx context.Context, db db.DbDetails, fortId str // getOrCreatePokestopRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreatePokestopRecord(ctx context.Context, db db.DbDetails, fortId string) (*Pokestop, func(), error) { +func getOrCreatePokestopRecord(ctx context.Context, db db.DbDetails, fortId string, caller string) (*Pokestop, func(), error) { // Create new Pokestop atomically - function only called if key doesn't exist pokestopItem, _ := pokestopCache.GetOrSetFunc(fortId, func() *Pokestop { return &Pokestop{PokestopData: PokestopData{Id: fortId}, newRecord: true} }) pokestop := pokestopItem.Value() - pokestop.Lock() + pokestop.Lock(caller) if pokestop.newRecord { // We should attempt to load from database @@ -502,7 +502,7 @@ func RemoveQuestsWithinGeofence(ctx context.Context, dbDetails db.DbDetails, geo clearedCount := 0 for _, id := range pokestopIds { - pokestop, unlock, err := getOrCreatePokestopRecord(ctx, dbDetails, id) + pokestop, unlock, err := getOrCreatePokestopRecord(ctx, dbDetails, id, "RemoveQuestsWithinGeofence") if err != nil { log.Errorf("RemoveQuestsWithinGeofence: failed to get pokestop %s: %v", id, err) continue @@ -594,7 +594,7 @@ func ExpireQuests(ctx context.Context, dbDetails db.DbDetails) (int, error) { // Process each pokestop once, clearing both quest types if needed for id := range allIds { - pokestop, unlock, err := getOrCreatePokestopRecord(ctx, dbDetails, id) + pokestop, unlock, err := getOrCreatePokestopRecord(ctx, dbDetails, id, "ExpireQuests") if err != nil { log.Errorf("ExpireQuests: failed to get pokestop %s: %v", id, err) continue diff --git a/decoder/routes.go b/decoder/routes.go index ba52db2f..bfabea01 100644 --- a/decoder/routes.go +++ b/decoder/routes.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "sync" "github.com/guregu/null/v6" @@ -39,7 +38,7 @@ type RouteData struct { // Route struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Route struct { - mu sync.Mutex `db:"-" json:"-"` // Object-level mutex + mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking RouteData // Embedded data fields - can be copied for write-behind queue @@ -70,9 +69,11 @@ func (r *Route) IsNewRecord() bool { return r.newRecord } -// Lock acquires the Route's mutex -func (r *Route) Lock() { - r.mu.Lock() +// Lock acquires the Route's mutex with caller tracking +func (r *Route) Lock(caller string) { + r.mu.entityType = "Route" + r.mu.entityId = r.Id + r.mu.Lock(caller) } // Unlock releases the Route's mutex diff --git a/decoder/routes_process.go b/decoder/routes_process.go index a0c5c83b..9f5c1891 100644 --- a/decoder/routes_process.go +++ b/decoder/routes_process.go @@ -8,7 +8,7 @@ import ( ) func UpdateRouteRecordWithSharedRouteProto(ctx context.Context, db db.DbDetails, sharedRouteProto *pogo.SharedRouteProto) error { - route, unlock, err := getOrCreateRouteRecord(ctx, db, sharedRouteProto.GetId()) + route, unlock, err := getOrCreateRouteRecord(ctx, db, sharedRouteProto.GetId(), "UpdateRouteRecord") if err != nil { return err } diff --git a/decoder/routes_state.go b/decoder/routes_state.go index 939df250..8710213d 100644 --- a/decoder/routes_state.go +++ b/decoder/routes_state.go @@ -21,10 +21,10 @@ func loadRouteFromDatabase(ctx context.Context, db db.DbDetails, routeId string, // peekRouteRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func peekRouteRecord(routeId string) (*Route, func(), error) { +func peekRouteRecord(routeId string, caller string) (*Route, func(), error) { if item := routeCache.Get(routeId); item != nil { route := item.Value() - route.Lock() + route.Lock(caller) return route, func() { route.Unlock() }, nil } return nil, nil, nil @@ -33,11 +33,11 @@ func peekRouteRecord(routeId string) (*Route, func(), error) { // getRouteRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getRouteRecordReadOnly(ctx context.Context, db db.DbDetails, routeId string) (*Route, func(), error) { +func getRouteRecordReadOnly(ctx context.Context, db db.DbDetails, routeId string, caller string) (*Route, func(), error) { // Check cache first if item := routeCache.Get(routeId); item != nil { route := item.Value() - route.Lock() + route.Lock(caller) return route, func() { route.Unlock() }, nil } @@ -58,14 +58,14 @@ func getRouteRecordReadOnly(ctx context.Context, db db.DbDetails, routeId string }) route := existingRoute.Value() - route.Lock() + route.Lock(caller) return route, func() { route.Unlock() }, nil } // getRouteRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Caller MUST call returned unlock function if non-nil. -func getRouteRecordForUpdate(ctx context.Context, db db.DbDetails, routeId string) (*Route, func(), error) { - route, unlock, err := getRouteRecordReadOnly(ctx, db, routeId) +func getRouteRecordForUpdate(ctx context.Context, db db.DbDetails, routeId string, caller string) (*Route, func(), error) { + route, unlock, err := getRouteRecordReadOnly(ctx, db, routeId, caller) if err != nil || route == nil { return nil, nil, err } @@ -75,14 +75,14 @@ func getRouteRecordForUpdate(ctx context.Context, db db.DbDetails, routeId strin // getOrCreateRouteRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreateRouteRecord(ctx context.Context, db db.DbDetails, routeId string) (*Route, func(), error) { +func getOrCreateRouteRecord(ctx context.Context, db db.DbDetails, routeId string, caller string) (*Route, func(), error) { // Create new Route atomically - function only called if key doesn't exist routeItem, _ := routeCache.GetOrSetFunc(routeId, func() *Route { return &Route{RouteData: RouteData{Id: routeId}, newRecord: true} }) route := routeItem.Value() - route.Lock() + route.Lock(caller) if route.newRecord { // We should attempt to load from database diff --git a/decoder/spawnpoint.go b/decoder/spawnpoint.go index fc21a6ff..3b0df822 100644 --- a/decoder/spawnpoint.go +++ b/decoder/spawnpoint.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "strconv" - "sync" "time" "golbat/db" @@ -35,7 +34,7 @@ type SpawnpointData struct { // Spawnpoint struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Spawnpoint struct { - mu sync.Mutex `db:"-" json:"-"` // Object-level mutex + mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking SpawnpointData // Embedded data fields - can be copied for write-behind queue @@ -72,9 +71,11 @@ func (s *Spawnpoint) IsNewRecord() bool { return s.newRecord } -// Lock acquires the Spawnpoint's mutex -func (s *Spawnpoint) Lock() { - s.mu.Lock() +// Lock acquires the Spawnpoint's mutex with caller tracking +func (s *Spawnpoint) Lock(caller string) { + s.mu.entityType = "Spawnpoint" + s.mu.entityId = strconv.FormatInt(s.Id, 10) + s.mu.Lock(caller) } // Unlock releases the Spawnpoint's mutex @@ -172,10 +173,10 @@ func loadSpawnpointFromDatabase(ctx context.Context, db db.DbDetails, spawnpoint // peekSpawnpointRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func peekSpawnpointRecord(spawnpointId int64) (*Spawnpoint, func(), error) { +func peekSpawnpointRecord(spawnpointId int64, caller string) (*Spawnpoint, func(), error) { if item := spawnpointCache.Get(spawnpointId); item != nil { spawnpoint := item.Value() - spawnpoint.Lock() + spawnpoint.Lock(caller) return spawnpoint, func() { spawnpoint.Unlock() }, nil } return nil, nil, nil @@ -183,11 +184,11 @@ func peekSpawnpointRecord(spawnpointId int64) (*Spawnpoint, func(), error) { // getSpawnpointRecord acquires lock. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getSpawnpointRecord(ctx context.Context, db db.DbDetails, spawnpointId int64) (*Spawnpoint, func(), error) { +func getSpawnpointRecord(ctx context.Context, db db.DbDetails, spawnpointId int64, caller string) (*Spawnpoint, func(), error) { // Check cache first if item := spawnpointCache.Get(spawnpointId); item != nil { spawnpoint := item.Value() - spawnpoint.Lock() + spawnpoint.Lock(caller) return spawnpoint, func() { spawnpoint.Unlock() }, nil } @@ -208,20 +209,20 @@ func getSpawnpointRecord(ctx context.Context, db db.DbDetails, spawnpointId int6 }) spawnpoint := existingSpawnpoint.Value() - spawnpoint.Lock() + spawnpoint.Lock(caller) return spawnpoint, func() { spawnpoint.Unlock() }, nil } // getOrCreateSpawnpointRecord gets existing or creates new, locked. // Caller MUST call returned unlock function. -func getOrCreateSpawnpointRecord(ctx context.Context, db db.DbDetails, spawnpointId int64) (*Spawnpoint, func(), error) { +func getOrCreateSpawnpointRecord(ctx context.Context, db db.DbDetails, spawnpointId int64, caller string) (*Spawnpoint, func(), error) { // Create new Spawnpoint atomically - function only called if key doesn't exist spawnpointItem, _ := spawnpointCache.GetOrSetFunc(spawnpointId, func() *Spawnpoint { return &Spawnpoint{SpawnpointData: SpawnpointData{Id: spawnpointId}, newRecord: true} }) spawnpoint := spawnpointItem.Value() - spawnpoint.Lock() + spawnpoint.Lock(caller) if spawnpoint.newRecord { // We should attempt to load from database @@ -260,7 +261,7 @@ func spawnpointUpdateFromWild(ctx context.Context, db db.DbDetails, wildPokemon date := time.Unix(expireTimeStamp, 0) secondOfHour := date.Second() + date.Minute()*60 - spawnpoint, unlock, err := getOrCreateSpawnpointRecord(ctx, db, spawnId) + spawnpoint, unlock, err := getOrCreateSpawnpointRecord(ctx, db, spawnId, "spawnpointUpdateFromWild") if err != nil { log.Errorf("getOrCreateSpawnpointRecord: %s", err) return @@ -271,7 +272,7 @@ func spawnpointUpdateFromWild(ctx context.Context, db db.DbDetails, wildPokemon spawnpointUpdate(ctx, db, spawnpoint) unlock() } else { - spawnpoint, unlock, err := getOrCreateSpawnpointRecord(ctx, db, spawnId) + spawnpoint, unlock, err := getOrCreateSpawnpointRecord(ctx, db, spawnId, "spawnpointUpdateFromMap") if err != nil { log.Errorf("getOrCreateSpawnpointRecord: %s", err) return diff --git a/decoder/station.go b/decoder/station.go index 959c0b74..b21f2412 100644 --- a/decoder/station.go +++ b/decoder/station.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "sync" "github.com/guregu/null/v6" ) @@ -44,7 +43,7 @@ type StationData struct { // Station struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Station struct { - mu sync.Mutex `db:"-" json:"-"` // Object-level mutex + mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking StationData // Embedded data fields - can be copied for write-behind queue @@ -81,9 +80,11 @@ func (station *Station) IsNewRecord() bool { return station.newRecord } -// Lock acquires the Station's mutex -func (station *Station) Lock() { - station.mu.Lock() +// Lock acquires the Station's mutex with caller tracking +func (station *Station) Lock(caller string) { + station.mu.entityType = "Station" + station.mu.entityId = station.Id + station.mu.Lock(caller) } // Unlock releases the Station's mutex diff --git a/decoder/station_process.go b/decoder/station_process.go index 05cda289..b73594f6 100644 --- a/decoder/station_process.go +++ b/decoder/station_process.go @@ -13,7 +13,7 @@ import ( func ResetStationedPokemonWithStationDetailsNotFound(ctx context.Context, db db.DbDetails, request *pogo.GetStationedPokemonDetailsProto) string { stationId := request.StationId - station, unlock, err := getStationRecordForUpdate(ctx, db, stationId) + station, unlock, err := getStationRecordForUpdate(ctx, db, stationId, "ResetStationedPokemon") if err != nil { log.Printf("Get station %s", err) return "Error getting station" @@ -33,7 +33,7 @@ func ResetStationedPokemonWithStationDetailsNotFound(ctx context.Context, db db. func UpdateStationWithStationDetails(ctx context.Context, db db.DbDetails, request *pogo.GetStationedPokemonDetailsProto, stationDetails *pogo.GetStationedPokemonDetailsOutProto) string { stationId := request.StationId - station, unlock, err := getStationRecordForUpdate(ctx, db, stationId) + station, unlock, err := getStationRecordForUpdate(ctx, db, stationId, "UpdateStationWithDetails") if err != nil { log.Printf("Get station %s", err) return "Error getting station" diff --git a/decoder/station_state.go b/decoder/station_state.go index 5ac0f06c..194d0b93 100644 --- a/decoder/station_state.go +++ b/decoder/station_state.go @@ -57,10 +57,10 @@ func loadStationFromDatabase(ctx context.Context, db db.DbDetails, stationId str // peekStationRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func peekStationRecord(stationId string) (*Station, func(), error) { +func peekStationRecord(stationId string, caller string) (*Station, func(), error) { if item := stationCache.Get(stationId); item != nil { station := item.Value() - station.Lock() + station.Lock(caller) return station, func() { station.Unlock() }, nil } return nil, nil, nil @@ -69,11 +69,11 @@ func peekStationRecord(stationId string) (*Station, func(), error) { // getStationRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getStationRecordReadOnly(ctx context.Context, db db.DbDetails, stationId string) (*Station, func(), error) { +func getStationRecordReadOnly(ctx context.Context, db db.DbDetails, stationId string, caller string) (*Station, func(), error) { // Check cache first if item := stationCache.Get(stationId); item != nil { station := item.Value() - station.Lock() + station.Lock(caller) return station, func() { station.Unlock() }, nil } @@ -97,14 +97,14 @@ func getStationRecordReadOnly(ctx context.Context, db db.DbDetails, stationId st }) station := existingStation.Value() - station.Lock() + station.Lock(caller) return station, func() { station.Unlock() }, nil } // getStationRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Caller MUST call returned unlock function if non-nil. -func getStationRecordForUpdate(ctx context.Context, db db.DbDetails, stationId string) (*Station, func(), error) { - station, unlock, err := getStationRecordReadOnly(ctx, db, stationId) +func getStationRecordForUpdate(ctx context.Context, db db.DbDetails, stationId string, caller string) (*Station, func(), error) { + station, unlock, err := getStationRecordReadOnly(ctx, db, stationId, caller) if err != nil || station == nil { return nil, nil, err } @@ -114,14 +114,14 @@ func getStationRecordForUpdate(ctx context.Context, db db.DbDetails, stationId s // getOrCreateStationRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreateStationRecord(ctx context.Context, db db.DbDetails, stationId string) (*Station, func(), error) { +func getOrCreateStationRecord(ctx context.Context, db db.DbDetails, stationId string, caller string) (*Station, func(), error) { // Create new Station atomically - function only called if key doesn't exist stationItem, _ := stationCache.GetOrSetFunc(stationId, func() *Station { return &Station{StationData: StationData{Id: stationId}, newRecord: true} }) station := stationItem.Value() - station.Lock() + station.Lock(caller) if station.newRecord { // We should attempt to load from database diff --git a/decoder/tappable.go b/decoder/tappable.go index 9a9bde6b..42c0d76d 100644 --- a/decoder/tappable.go +++ b/decoder/tappable.go @@ -2,7 +2,7 @@ package decoder import ( "fmt" - "sync" + "strconv" "github.com/guregu/null/v6" ) @@ -27,7 +27,7 @@ type TappableData struct { // Tappable struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Tappable struct { - mu sync.Mutex `db:"-"` // Object-level mutex + mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking TappableData // Embedded data fields - can be copied for write-behind queue @@ -51,9 +51,11 @@ func (ta *Tappable) IsNewRecord() bool { return ta.newRecord } -// Lock acquires the Tappable's mutex -func (ta *Tappable) Lock() { - ta.mu.Lock() +// Lock acquires the Tappable's mutex with caller tracking +func (ta *Tappable) Lock(caller string) { + ta.mu.entityType = "Tappable" + ta.mu.entityId = strconv.FormatUint(ta.Id, 10) + ta.mu.Lock(caller) } // Unlock releases the Tappable's mutex diff --git a/decoder/tappable_decode.go b/decoder/tappable_decode.go index 2b6f65e9..a76aed92 100644 --- a/decoder/tappable_decode.go +++ b/decoder/tappable_decode.go @@ -80,7 +80,7 @@ func (ta *Tappable) updateFromProcessTappableProto(ctx context.Context, db db.Db func (ta *Tappable) setExpireTimestamp(ctx context.Context, db db.DbDetails, timestampMs int64) { ta.SetExpireTimestampVerified(false) if spawnId := ta.SpawnId.ValueOrZero(); spawnId != 0 { - spawnPoint, unlock, _ := getSpawnpointRecord(ctx, db, spawnId) + spawnPoint, unlock, _ := getSpawnpointRecord(ctx, db, spawnId, "updateFromTappableEncounter") if spawnPoint != nil && spawnPoint.DespawnSec.Valid { despawnSecond := int(spawnPoint.DespawnSec.ValueOrZero()) unlock() diff --git a/decoder/tappable_process.go b/decoder/tappable_process.go index 1001ac5e..ce71a0c3 100644 --- a/decoder/tappable_process.go +++ b/decoder/tappable_process.go @@ -13,7 +13,7 @@ import ( func UpdateTappable(ctx context.Context, db db.DbDetails, request *pogo.ProcessTappableProto, tappableDetails *pogo.ProcessTappableOutProto, timestampMs int64) string { id := request.GetEncounterId() - tappable, unlock, err := getOrCreateTappableRecord(ctx, db, id) + tappable, unlock, err := getOrCreateTappableRecord(ctx, db, id, "UpdateTappableRecord") if err != nil { log.Printf("getOrCreateTappableRecord: %s", err) return "Error getting tappable" diff --git a/decoder/tappable_state.go b/decoder/tappable_state.go index 9cbc2c58..dffc5a70 100644 --- a/decoder/tappable_state.go +++ b/decoder/tappable_state.go @@ -24,10 +24,10 @@ func loadTappableFromDatabase(ctx context.Context, db db.DbDetails, id uint64, t // PeekTappableRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func PeekTappableRecord(id uint64) (*Tappable, func(), error) { +func PeekTappableRecord(id uint64, caller string) (*Tappable, func(), error) { if item := tappableCache.Get(id); item != nil { tappable := item.Value() - tappable.Lock() + tappable.Lock(caller) return tappable, func() { tappable.Unlock() }, nil } return nil, nil, nil @@ -36,11 +36,11 @@ func PeekTappableRecord(id uint64) (*Tappable, func(), error) { // getTappableRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getTappableRecordReadOnly(ctx context.Context, db db.DbDetails, id uint64) (*Tappable, func(), error) { +func getTappableRecordReadOnly(ctx context.Context, db db.DbDetails, id uint64, caller string) (*Tappable, func(), error) { // Check cache first if item := tappableCache.Get(id); item != nil { tappable := item.Value() - tappable.Lock() + tappable.Lock(caller) return tappable, func() { tappable.Unlock() }, nil } @@ -61,20 +61,20 @@ func getTappableRecordReadOnly(ctx context.Context, db db.DbDetails, id uint64) }) tappable := existingTappable.Value() - tappable.Lock() + tappable.Lock(caller) return tappable, func() { tappable.Unlock() }, nil } // getOrCreateTappableRecord gets existing or creates new, locked. // Caller MUST call returned unlock function. -func getOrCreateTappableRecord(ctx context.Context, db db.DbDetails, id uint64) (*Tappable, func(), error) { +func getOrCreateTappableRecord(ctx context.Context, db db.DbDetails, id uint64, caller string) (*Tappable, func(), error) { // Create new Tappable atomically - function only called if key doesn't exist tappableItem, _ := tappableCache.GetOrSetFunc(id, func() *Tappable { return &Tappable{TappableData: TappableData{Id: id}, newRecord: true} }) tappable := tappableItem.Value() - tappable.Lock() + tappable.Lock(caller) if tappable.newRecord { // We should attempt to load from database diff --git a/decoder/tracked_mutex.go b/decoder/tracked_mutex.go new file mode 100644 index 00000000..1735dc5a --- /dev/null +++ b/decoder/tracked_mutex.go @@ -0,0 +1,51 @@ +package decoder + +import ( + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// TrackedMutex wraps sync.Mutex with contention detection and holder tracking. +// Fast path cost: TryLock() + time.Now() + string pointer store ≈ 25ns. +type TrackedMutex struct { + mu sync.Mutex + holder string // caller that holds the lock + acquiredAt time.Time // when lock was acquired + entityType string // "Pokestop", "Gym", etc. + entityId string // the entity ID +} + +// Lock attempts to acquire the mutex. If the lock is contended, it logs the +// current holder and the waiting caller, then blocks until acquired. +func (m *TrackedMutex) Lock(caller string) { + if m.mu.TryLock() { + m.holder = caller + m.acquiredAt = time.Now() + return + } + // Contention path + holder := m.holder + heldFor := time.Since(m.acquiredAt) + log.Warnf("[LOCK_CONTENTION] %s id=%s waiter=%s holder=%s held_for=%s", + m.entityType, m.entityId, caller, holder, heldFor) + start := time.Now() + m.mu.Lock() + log.Warnf("[LOCK_ACQUIRED] %s id=%s caller=%s waited=%s (holder was %s)", + m.entityType, m.entityId, caller, time.Since(start), holder) + m.holder = caller + m.acquiredAt = time.Now() +} + +// Unlock releases the mutex. If the lock was held for more than 5 seconds, it +// logs a warning. +func (m *TrackedMutex) Unlock() { + held := time.Since(m.acquiredAt) + if held > 5*time.Second { + log.Warnf("[LOCK_HELD_LONG] %s id=%s holder=%s held_for=%s", + m.entityType, m.entityId, m.holder, held) + } + m.holder = "" + m.mu.Unlock() +} diff --git a/decoder/weather.go b/decoder/weather.go index b75a64a4..6c8427c2 100644 --- a/decoder/weather.go +++ b/decoder/weather.go @@ -4,7 +4,7 @@ import ( "context" "database/sql" "errors" - "sync" + "strconv" "golbat/db" "golbat/pogo" @@ -19,7 +19,7 @@ import ( // Weather struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Weather struct { - mu sync.Mutex `db:"-" json:"-"` // Object-level mutex + mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking Id int64 `db:"id"` Latitude float64 `db:"latitude"` @@ -83,9 +83,11 @@ func (weather *Weather) IsNewRecord() bool { return weather.newRecord } -// Lock acquires the Weather's mutex -func (weather *Weather) Lock() { - weather.mu.Lock() +// Lock acquires the Weather's mutex with caller tracking +func (weather *Weather) Lock(caller string) { + weather.mu.entityType = "Weather" + weather.mu.entityId = strconv.FormatInt(weather.Id, 10) + weather.mu.Lock(caller) } // Unlock releases the Weather's mutex @@ -214,10 +216,10 @@ func loadWeatherFromDatabase(ctx context.Context, db db.DbDetails, weatherId int // peekWeatherRecord - cache-only lookup, no DB fallback, returns locked. // Caller MUST call returned unlock function if non-nil. -func peekWeatherRecord(weatherId int64) (*Weather, func(), error) { +func peekWeatherRecord(weatherId int64, caller string) (*Weather, func(), error) { if item := weatherCache.Get(weatherId); item != nil { weather := item.Value() - weather.Lock() + weather.Lock(caller) return weather, func() { weather.Unlock() }, nil } return nil, nil, nil @@ -226,11 +228,11 @@ func peekWeatherRecord(weatherId int64) (*Weather, func(), error) { // getWeatherRecordReadOnly acquires lock but does NOT take snapshot. // Use for read-only checks. Will cause a backing database lookup. // Caller MUST call returned unlock function if non-nil. -func getWeatherRecordReadOnly(ctx context.Context, db db.DbDetails, weatherId int64) (*Weather, func(), error) { +func getWeatherRecordReadOnly(ctx context.Context, db db.DbDetails, weatherId int64, caller string) (*Weather, func(), error) { // Check cache first if item := weatherCache.Get(weatherId); item != nil { weather := item.Value() - weather.Lock() + weather.Lock(caller) return weather, func() { weather.Unlock() }, nil } @@ -251,14 +253,14 @@ func getWeatherRecordReadOnly(ctx context.Context, db db.DbDetails, weatherId in }) weather := existingWeather.Value() - weather.Lock() + weather.Lock(caller) return weather, func() { weather.Unlock() }, nil } // getWeatherRecordForUpdate acquires lock AND takes snapshot for webhook comparison. // Caller MUST call returned unlock function if non-nil. -func getWeatherRecordForUpdate(ctx context.Context, db db.DbDetails, weatherId int64) (*Weather, func(), error) { - weather, unlock, err := getWeatherRecordReadOnly(ctx, db, weatherId) +func getWeatherRecordForUpdate(ctx context.Context, db db.DbDetails, weatherId int64, caller string) (*Weather, func(), error) { + weather, unlock, err := getWeatherRecordReadOnly(ctx, db, weatherId, caller) if err != nil || weather == nil { return nil, nil, err } @@ -268,14 +270,14 @@ func getWeatherRecordForUpdate(ctx context.Context, db db.DbDetails, weatherId i // getOrCreateWeatherRecord gets existing or creates new, locked with snapshot. // Caller MUST call returned unlock function. -func getOrCreateWeatherRecord(ctx context.Context, db db.DbDetails, weatherId int64) (*Weather, func(), error) { +func getOrCreateWeatherRecord(ctx context.Context, db db.DbDetails, weatherId int64, caller string) (*Weather, func(), error) { // Create new Weather atomically - function only called if key doesn't exist weatherItem, _ := weatherCache.GetOrSetFunc(weatherId, func() *Weather { return &Weather{Id: weatherId, newRecord: true} }) weather := weatherItem.Value() - weather.Lock() + weather.Lock(caller) if weather.newRecord { // We should attempt to load from database diff --git a/decoder/weather_iv.go b/decoder/weather_iv.go index 059e1262..f765de0d 100644 --- a/decoder/weather_iv.go +++ b/decoder/weather_iv.go @@ -85,7 +85,7 @@ func ProactiveIVSwitch(ctx context.Context, db db.DbDetails, weatherUpdate Weath return true } - pokemon, unlock, _ := peekPokemonRecordReadOnly(pokemonId) + pokemon, unlock, _ := peekPokemonRecordReadOnly(pokemonId, "ProactiveIVSwitch") if pokemon != nil { pokemonLocked++ if pokemonLookup.PokemonLookup.PokemonId == pokemon.PokemonId && (pokemon.IsDitto || int64(pokemonLookup.PokemonLookup.Form) == pokemon.Form.ValueOrZero()) && int64(newWeather) != pokemon.Weather.ValueOrZero() && pokemon.ExpireTimestamp.ValueOrZero() >= startUnix && pokemon.Updated.ValueOrZero() < timestamp { diff --git a/routes.go b/routes.go index e6a1dbd7..fd3c9a10 100644 --- a/routes.go +++ b/routes.go @@ -490,7 +490,7 @@ func GetPokestop(c *gin.Context) { fortId := c.Param("fort_id") //ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - pokestop, unlock, err := decoder.PeekPokestopRecord(fortId) + pokestop, unlock, err := decoder.PeekPokestopRecord(fortId, "API.GetPokestop") if unlock != nil { defer unlock() } @@ -513,7 +513,7 @@ func GetGym(c *gin.Context) { gymId := c.Param("gym_id") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - gym, unlock, err := decoder.GetGymRecordReadOnly(ctx, dbDetails, gymId) + gym, unlock, err := decoder.GetGymRecordReadOnly(ctx, dbDetails, gymId, "API.GetGym") if unlock != nil { defer unlock() } @@ -583,7 +583,7 @@ func GetGyms(c *gin.Context) { out := make([]decoder.ApiGymResult, 0, len(ids)) for _, id := range ids { - g, unlock, err := decoder.GetGymRecordReadOnly(ctx, dbDetails, id) + g, unlock, err := decoder.GetGymRecordReadOnly(ctx, dbDetails, id, "API.GetGyms") if err != nil { if unlock != nil { unlock() @@ -715,7 +715,7 @@ func SearchGyms(c *gin.Context) { if id == "" { continue } - g, unlock, err := decoder.GetGymRecordReadOnly(ctx, dbDetails, id) + g, unlock, err := decoder.GetGymRecordReadOnly(ctx, dbDetails, id, "API.GetFortTracker") if err != nil { if unlock != nil { unlock() @@ -824,7 +824,7 @@ func GetTappable(c *gin.Context) { c.Status(http.StatusBadRequest) return } - tappable, unlock, err := decoder.PeekTappableRecord(tappableId) + tappable, unlock, err := decoder.PeekTappableRecord(tappableId, "API.GetTappable") if unlock != nil { defer unlock() } From fd60664717373928e804e171e91e259a4ef154fb Mon Sep 17 00:00:00 2001 From: James Berry Date: Wed, 18 Mar 2026 10:49:58 +0000 Subject: [PATCH 2/4] refactor: make TrackedMutex generic, use atomics, skip blank incident IDs - TrackedMutex[K] is now generic over the entity ID type; string formatting (%v) only runs on the cold contention/warning paths - holder and acquiredAt use atomic.Value / atomic.Int64 so reads from the contention path are race-detector safe - Replace Init+sync.Once with direct parameters on Lock/Unlock for simplicity and smaller struct size - Consolidate repeated time.Now() calls into local variables - Skip incidents with blank IncidentId in UpdateFortBatch to avoid cache key collisions and spurious contention Co-Authored-By: Claude Opus 4.6 (1M context) --- decoder/gmo_decode.go | 3 +++ decoder/gym.go | 8 +++--- decoder/incident.go | 8 +++--- decoder/pokemon.go | 9 +++---- decoder/pokestop.go | 8 +++--- decoder/routes.go | 8 +++--- decoder/spawnpoint.go | 8 +++--- decoder/station.go | 8 +++--- decoder/tappable.go | 9 +++---- decoder/tracked_mutex.go | 57 +++++++++++++++++++++++----------------- decoder/weather.go | 9 +++---- 11 files changed, 63 insertions(+), 72 deletions(-) diff --git a/decoder/gmo_decode.go b/decoder/gmo_decode.go index f2fa8b09..97ce0dd9 100644 --- a/decoder/gmo_decode.go +++ b/decoder/gmo_decode.go @@ -61,6 +61,9 @@ func UpdateFortBatch(ctx context.Context, db db.DbDetails, scanParameters ScanPa if incidents != nil { for _, incidentProto := range incidents { + if incidentProto.IncidentId == "" { + continue + } incident, unlock, err := getOrCreateIncidentRecord(ctx, db, incidentProto.IncidentId, fortId, "UpdateFortBatch") if err != nil { log.Errorf("getOrCreateIncidentRecord: %s", err) diff --git a/decoder/gym.go b/decoder/gym.go index e15822b4..7d24a0b7 100644 --- a/decoder/gym.go +++ b/decoder/gym.go @@ -57,7 +57,7 @@ type GymData struct { // Gym struct. // REMINDER! Keep hasChangesGym updated after making changes type Gym struct { - mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[string] `db:"-"` // Object-level mutex with contention tracking GymData // Embedded data fields (all db columns) @@ -130,14 +130,12 @@ func (gym *Gym) snapshotOldValues() { // Lock acquires the Gym's mutex with caller tracking func (gym *Gym) Lock(caller string) { - gym.mu.entityType = "Gym" - gym.mu.entityId = gym.Id - gym.mu.Lock(caller) + gym.mu.Lock(caller, "Gym", gym.Id) } // Unlock releases the Gym's mutex func (gym *Gym) Unlock() { - gym.mu.Unlock() + gym.mu.Unlock("Gym", gym.Id) } // --- Set methods with dirty tracking --- diff --git a/decoder/incident.go b/decoder/incident.go index 00ff5bb5..1a42c10c 100644 --- a/decoder/incident.go +++ b/decoder/incident.go @@ -29,7 +29,7 @@ type IncidentData struct { // Incident struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Incident struct { - mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[string] `db:"-"` // Object-level mutex with contention tracking IncidentData // Embedded data fields - can be copied for write-behind queue @@ -101,14 +101,12 @@ func (incident *Incident) IsNewRecord() bool { // Lock acquires the Incident's mutex with caller tracking func (incident *Incident) Lock(caller string) { - incident.mu.entityType = "Incident" - incident.mu.entityId = incident.Id - incident.mu.Lock(caller) + incident.mu.Lock(caller, "Incident", incident.Id) } // Unlock releases the Incident's mutex func (incident *Incident) Unlock() { - incident.mu.Unlock() + incident.mu.Unlock("Incident", incident.Id) } // snapshotOldValues saves current values for webhook comparison diff --git a/decoder/pokemon.go b/decoder/pokemon.go index e3ddb2f4..4f299d1a 100644 --- a/decoder/pokemon.go +++ b/decoder/pokemon.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "strconv" "golbat/grpc" @@ -63,7 +62,7 @@ type PokemonData struct { // // FirstSeenTimestamp: This field is used in IsNewRecord. It should only be set in savePokemonRecord. type Pokemon struct { - mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[uint64] `db:"-"` // Object-level mutex with contention tracking PokemonData // Embedded data fields - can be copied for write-behind queue @@ -164,14 +163,12 @@ func (pokemon *Pokemon) snapshotOldValues() { // Lock acquires the Pokemon's mutex with caller tracking func (pokemon *Pokemon) Lock(caller string) { - pokemon.mu.entityType = "Pokemon" - pokemon.mu.entityId = strconv.FormatUint(uint64(pokemon.Id), 10) - pokemon.mu.Lock(caller) + pokemon.mu.Lock(caller, "Pokemon", uint64(pokemon.Id)) } // Unlock releases the Pokemon's mutex func (pokemon *Pokemon) Unlock() { - pokemon.mu.Unlock() + pokemon.mu.Unlock("Pokemon", uint64(pokemon.Id)) } // --- Set methods with dirty tracking --- diff --git a/decoder/pokestop.go b/decoder/pokestop.go index 9135a89f..7579781c 100644 --- a/decoder/pokestop.go +++ b/decoder/pokestop.go @@ -68,7 +68,7 @@ type PokestopData struct { // Pokestop struct. type Pokestop struct { - mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[string] `db:"-"` // Object-level mutex with contention tracking PokestopData // Embedded data fields - can be copied for write-behind queue @@ -176,14 +176,12 @@ func (p *Pokestop) snapshotOldValues() { // Lock acquires the Pokestop's mutex with caller tracking func (p *Pokestop) Lock(caller string) { - p.mu.entityType = "Pokestop" - p.mu.entityId = p.Id - p.mu.Lock(caller) + p.mu.Lock(caller, "Pokestop", p.Id) } // Unlock releases the Pokestop's mutex func (p *Pokestop) Unlock() { - p.mu.Unlock() + p.mu.Unlock("Pokestop", p.Id) } // --- Set methods with dirty tracking --- diff --git a/decoder/routes.go b/decoder/routes.go index bfabea01..ff8037c6 100644 --- a/decoder/routes.go +++ b/decoder/routes.go @@ -38,7 +38,7 @@ type RouteData struct { // Route struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Route struct { - mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[string] `db:"-" json:"-"` // Object-level mutex with contention tracking RouteData // Embedded data fields - can be copied for write-behind queue @@ -71,14 +71,12 @@ func (r *Route) IsNewRecord() bool { // Lock acquires the Route's mutex with caller tracking func (r *Route) Lock(caller string) { - r.mu.entityType = "Route" - r.mu.entityId = r.Id - r.mu.Lock(caller) + r.mu.Lock(caller, "Route", r.Id) } // Unlock releases the Route's mutex func (r *Route) Unlock() { - r.mu.Unlock() + r.mu.Unlock("Route", r.Id) } // snapshotOldValues saves current values for webhook comparison diff --git a/decoder/spawnpoint.go b/decoder/spawnpoint.go index 3b0df822..2adf65d0 100644 --- a/decoder/spawnpoint.go +++ b/decoder/spawnpoint.go @@ -34,7 +34,7 @@ type SpawnpointData struct { // Spawnpoint struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Spawnpoint struct { - mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[int64] `db:"-" json:"-"` // Object-level mutex with contention tracking SpawnpointData // Embedded data fields - can be copied for write-behind queue @@ -73,14 +73,12 @@ func (s *Spawnpoint) IsNewRecord() bool { // Lock acquires the Spawnpoint's mutex with caller tracking func (s *Spawnpoint) Lock(caller string) { - s.mu.entityType = "Spawnpoint" - s.mu.entityId = strconv.FormatInt(s.Id, 10) - s.mu.Lock(caller) + s.mu.Lock(caller, "Spawnpoint", s.Id) } // Unlock releases the Spawnpoint's mutex func (s *Spawnpoint) Unlock() { - s.mu.Unlock() + s.mu.Unlock("Spawnpoint", s.Id) } // --- Set methods with dirty tracking --- diff --git a/decoder/station.go b/decoder/station.go index b21f2412..02fb9ed3 100644 --- a/decoder/station.go +++ b/decoder/station.go @@ -43,7 +43,7 @@ type StationData struct { // Station struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Station struct { - mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[string] `db:"-" json:"-"` // Object-level mutex with contention tracking StationData // Embedded data fields - can be copied for write-behind queue @@ -82,14 +82,12 @@ func (station *Station) IsNewRecord() bool { // Lock acquires the Station's mutex with caller tracking func (station *Station) Lock(caller string) { - station.mu.entityType = "Station" - station.mu.entityId = station.Id - station.mu.Lock(caller) + station.mu.Lock(caller, "Station", station.Id) } // Unlock releases the Station's mutex func (station *Station) Unlock() { - station.mu.Unlock() + station.mu.Unlock("Station", station.Id) } // snapshotOldValues saves current values for webhook comparison diff --git a/decoder/tappable.go b/decoder/tappable.go index 42c0d76d..591be874 100644 --- a/decoder/tappable.go +++ b/decoder/tappable.go @@ -2,7 +2,6 @@ package decoder import ( "fmt" - "strconv" "github.com/guregu/null/v6" ) @@ -27,7 +26,7 @@ type TappableData struct { // Tappable struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Tappable struct { - mu TrackedMutex `db:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[uint64] `db:"-"` // Object-level mutex with contention tracking TappableData // Embedded data fields - can be copied for write-behind queue @@ -53,14 +52,12 @@ func (ta *Tappable) IsNewRecord() bool { // Lock acquires the Tappable's mutex with caller tracking func (ta *Tappable) Lock(caller string) { - ta.mu.entityType = "Tappable" - ta.mu.entityId = strconv.FormatUint(ta.Id, 10) - ta.mu.Lock(caller) + ta.mu.Lock(caller, "Tappable", ta.Id) } // Unlock releases the Tappable's mutex func (ta *Tappable) Unlock() { - ta.mu.Unlock() + ta.mu.Unlock("Tappable", ta.Id) } // --- Set methods with dirty tracking --- diff --git a/decoder/tracked_mutex.go b/decoder/tracked_mutex.go index 1735dc5a..39d95db1 100644 --- a/decoder/tracked_mutex.go +++ b/decoder/tracked_mutex.go @@ -2,50 +2,59 @@ package decoder import ( "sync" + "sync/atomic" "time" log "github.com/sirupsen/logrus" ) // TrackedMutex wraps sync.Mutex with contention detection and holder tracking. -// Fast path cost: TryLock() + time.Now() + string pointer store ≈ 25ns. -type TrackedMutex struct { +// Generic over K (the entity ID type) so the fast path stores the native ID +// and string formatting (%v) is deferred to the cold contention/warning paths. +// Fast path cost: TryLock() + time.Now() + atomic store ≈ 25ns. +type TrackedMutex[K any] struct { mu sync.Mutex - holder string // caller that holds the lock - acquiredAt time.Time // when lock was acquired - entityType string // "Pokestop", "Gym", etc. - entityId string // the entity ID + holder atomic.Value // string - caller that holds the lock + acquiredAt atomic.Int64 // UnixNano - when lock was acquired +} + +func (m *TrackedMutex[K]) loadHolder() string { + h, _ := m.holder.Load().(string) + return h } // Lock attempts to acquire the mutex. If the lock is contended, it logs the // current holder and the waiting caller, then blocks until acquired. -func (m *TrackedMutex) Lock(caller string) { +// entityType and id are only used in log messages on the contention path. +func (m *TrackedMutex[K]) Lock(caller, entityType string, id K) { if m.mu.TryLock() { - m.holder = caller - m.acquiredAt = time.Now() + now := time.Now() + m.holder.Store(caller) + m.acquiredAt.Store(now.UnixNano()) return } - // Contention path - holder := m.holder - heldFor := time.Since(m.acquiredAt) - log.Warnf("[LOCK_CONTENTION] %s id=%s waiter=%s holder=%s held_for=%s", - m.entityType, m.entityId, caller, holder, heldFor) + // Contention path — holder/acquiredAt use atomics for race-detector safety. + holder := m.loadHolder() + heldFor := time.Since(time.Unix(0, m.acquiredAt.Load())) + log.Warnf("[LOCK_CONTENTION] %s id=%v waiter=%s holder=%s held_for=%s", + entityType, id, caller, holder, heldFor) start := time.Now() m.mu.Lock() - log.Warnf("[LOCK_ACQUIRED] %s id=%s caller=%s waited=%s (holder was %s)", - m.entityType, m.entityId, caller, time.Since(start), holder) - m.holder = caller - m.acquiredAt = time.Now() + now := time.Now() + log.Warnf("[LOCK_ACQUIRED] %s id=%v caller=%s waited=%s (holder was %s)", + entityType, id, caller, now.Sub(start), holder) + m.holder.Store(caller) + m.acquiredAt.Store(now.UnixNano()) } // Unlock releases the mutex. If the lock was held for more than 5 seconds, it -// logs a warning. -func (m *TrackedMutex) Unlock() { - held := time.Since(m.acquiredAt) +// logs a warning. entityType and id are only used when the threshold is exceeded. +func (m *TrackedMutex[K]) Unlock(entityType string, id K) { + held := time.Since(time.Unix(0, m.acquiredAt.Load())) if held > 5*time.Second { - log.Warnf("[LOCK_HELD_LONG] %s id=%s holder=%s held_for=%s", - m.entityType, m.entityId, m.holder, held) + log.Warnf("[LOCK_HELD_LONG] %s id=%v holder=%s held_for=%s", + entityType, id, m.loadHolder(), held) } - m.holder = "" + m.holder.Store("") m.mu.Unlock() } diff --git a/decoder/weather.go b/decoder/weather.go index 6c8427c2..0ff376a9 100644 --- a/decoder/weather.go +++ b/decoder/weather.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "errors" - "strconv" "golbat/db" "golbat/pogo" @@ -19,7 +18,7 @@ import ( // Weather struct. // REMINDER! Dirty flag pattern - use setter methods to modify fields type Weather struct { - mu TrackedMutex `db:"-" json:"-"` // Object-level mutex with contention tracking + mu TrackedMutex[int64] `db:"-" json:"-"` // Object-level mutex with contention tracking Id int64 `db:"id"` Latitude float64 `db:"latitude"` @@ -85,14 +84,12 @@ func (weather *Weather) IsNewRecord() bool { // Lock acquires the Weather's mutex with caller tracking func (weather *Weather) Lock(caller string) { - weather.mu.entityType = "Weather" - weather.mu.entityId = strconv.FormatInt(weather.Id, 10) - weather.mu.Lock(caller) + weather.mu.Lock(caller, "Weather", weather.Id) } // Unlock releases the Weather's mutex func (weather *Weather) Unlock() { - weather.mu.Unlock() + weather.mu.Unlock("Weather", weather.Id) } // snapshotOldValues saves current values for webhook comparison From ea69ed95c7c5b95b5e855787cebcb8dea2675662 Mon Sep 17 00:00:00 2001 From: James Berry Date: Wed, 18 Mar 2026 14:48:07 +0000 Subject: [PATCH 3/4] perf: reuse time.Now() in contention path instead of calling time.Since Co-Authored-By: Claude Opus 4.6 (1M context) --- decoder/tracked_mutex.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/decoder/tracked_mutex.go b/decoder/tracked_mutex.go index 39d95db1..2e4b6780 100644 --- a/decoder/tracked_mutex.go +++ b/decoder/tracked_mutex.go @@ -35,10 +35,10 @@ func (m *TrackedMutex[K]) Lock(caller, entityType string, id K) { } // Contention path — holder/acquiredAt use atomics for race-detector safety. holder := m.loadHolder() - heldFor := time.Since(time.Unix(0, m.acquiredAt.Load())) + start := time.Now() + heldFor := start.Sub(time.Unix(0, m.acquiredAt.Load())) log.Warnf("[LOCK_CONTENTION] %s id=%v waiter=%s holder=%s held_for=%s", entityType, id, caller, holder, heldFor) - start := time.Now() m.mu.Lock() now := time.Now() log.Warnf("[LOCK_ACQUIRED] %s id=%v caller=%s waited=%s (holder was %s)", From dc4e92e78dd6013990149ee954ec47f5c2ce6f5c Mon Sep 17 00:00:00 2001 From: James Berry Date: Wed, 18 Mar 2026 16:15:17 +0000 Subject: [PATCH 4/4] perf: exponential backoff TryLock before logging contention Add 1-2-4-8-16-32-64-128ms backoff loop (~255ms total) before emitting LOCK_CONTENTION warnings. Most transient contention resolves silently; only locks held longer than 255ms produce log output. Co-Authored-By: Claude Opus 4.6 (1M context) --- decoder/tracked_mutex.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/decoder/tracked_mutex.go b/decoder/tracked_mutex.go index 2e4b6780..9d48f7b5 100644 --- a/decoder/tracked_mutex.go +++ b/decoder/tracked_mutex.go @@ -33,7 +33,18 @@ func (m *TrackedMutex[K]) Lock(caller, entityType string, id K) { m.acquiredAt.Store(now.UnixNano()) return } - // Contention path — holder/acquiredAt use atomics for race-detector safety. + // Contention path — exponential backoff TryLock before logging. + // Delays: 1, 2, 4, 8, 16, 32, 64, 128ms = 255ms total before warning. + for delay := time.Millisecond; delay <= 128*time.Millisecond; delay *= 2 { + time.Sleep(delay) + if m.mu.TryLock() { + now := time.Now() + m.holder.Store(caller) + m.acquiredAt.Store(now.UnixNano()) + return + } + } + // Still contended after ~255ms — log and block. holder := m.loadHolder() start := time.Now() heldFor := start.Sub(time.Unix(0, m.acquiredAt.Load()))