Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions install/helm/agones/templates/crds/fleet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ spec:
allocatedReplicas:
type: integer
minimum: 0
allocations:
type: integer
minimum: 0
players:
type: object
nullable: true
Expand Down
3 changes: 3 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6583,6 +6583,9 @@ spec:
allocatedReplicas:
type: integer
minimum: 0
allocations:
type: integer
minimum: 0
players:
type: object
nullable: true
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/agones/v1/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type FleetStatus struct {
ReservedReplicas int32 `json:"reservedReplicas"`
// AllocatedReplicas are the number of Allocated GameServer replicas
AllocatedReplicas int32 `json:"allocatedReplicas"`
// Allocations is a counter of the number of allocations observed.
Allocations int64 `json:"allocations"`
// [Stage:Alpha]
// [FeatureFlag:PlayerTracking]
// Players are the current total player capacity and count for this Fleet
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/agones/v1/fleetstatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

151 changes: 149 additions & 2 deletions pkg/fleets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"sync"
"sync/atomic"
"time"

"agones.dev/agones/pkg/apis/agones"
Expand Down Expand Up @@ -64,12 +67,14 @@ type Extensions struct {
type Controller struct {
baseLogger *logrus.Entry
crdGetter apiextclientv1.CustomResourceDefinitionInterface
gameServerSynced cache.InformerSynced
gameServerSetGetter getterv1.GameServerSetsGetter
gameServerSetLister listerv1.GameServerSetLister
gameServerSetSynced cache.InformerSynced
fleetGetter getterv1.FleetsGetter
fleetLister listerv1.FleetLister
fleetSynced cache.InformerSynced
allocs *allocTracker
workerqueue *workerqueue.WorkerQueue
recorder record.EventRecorder
}
Expand All @@ -82,6 +87,9 @@ func NewController(
agonesClient versioned.Interface,
agonesInformerFactory externalversions.SharedInformerFactory) *Controller {

gameServers := agonesInformerFactory.Agones().V1().GameServers()
gsInformer := gameServers.Informer()

gameServerSets := agonesInformerFactory.Agones().V1().GameServerSets()
gsSetInformer := gameServerSets.Informer()

Expand All @@ -90,12 +98,14 @@ func NewController(

c := &Controller{
crdGetter: extClient.ApiextensionsV1().CustomResourceDefinitions(),
gameServerSynced: gsInformer.HasSynced,
gameServerSetGetter: agonesClient.AgonesV1(),
gameServerSetLister: gameServerSets.Lister(),
gameServerSetSynced: gsSetInformer.HasSynced,
fleetGetter: agonesClient.AgonesV1(),
fleetLister: fleets.Lister(),
fleetSynced: fInformer.HasSynced,
allocs: newAllocTracker(),
}

c.baseLogger = runtime.NewLoggerWithType(c)
Expand All @@ -112,6 +122,11 @@ func NewController(
UpdateFunc: func(_, newObj interface{}) {
c.workerqueue.Enqueue(newObj)
},
DeleteFunc: func(obj interface{}) {
fleet := obj.(*agonesv1.Fleet)

c.allocs.remove(fleet.ObjectMeta.Namespace, fleet.ObjectMeta.Name)
},
})

_, _ = gsSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -125,6 +140,25 @@ func NewController(
},
})

_, _ = gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll wan tto sync GameServers at, since you are now watching their events:

https://github.com/nrwiersma/agones/blob/30678d321529ca2fd3bdd27f58b8bd2a4b5c8e96/pkg/fleets/controller.go#L255

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that. I decided not to in the end as it does not stop the controller from working. We do not handle Add on the EventHandler (just Update), so the sync would basically block for no reason. Will gladly add it if you still want though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd recommend it - I've seen some weirdness when not syncing -- it'll probably mean nothing much, but also means that if other people edit this later on, or do other controller things they aren't left wondering "when do I sync, or not sync" - better to always sync 👍🏻

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. The global rule for me is, if it is in the processNextItem loop anywhere, it gets synced. But will add it in.

UpdateFunc: func(oldObj, newObj interface{}) {
oldGs := oldObj.(*agonesv1.GameServer)
newGs := newObj.(*agonesv1.GameServer)

if oldGs.Status.State == agonesv1.GameServerStateAllocated || newGs.Status.State != agonesv1.GameServerStateAllocated {
// Count only the transition of a GameServer into the Allocated state.
return
}
fleet, ok := newGs.Labels[agonesv1.FleetNameLabel]
if !ok || fleet == "" {
// The game server is not attached to a fleet. Nothing to do.
return
}

c.allocs.inc(newGs.Namespace, fleet, 1)
},
})

return c
}

Expand Down Expand Up @@ -221,11 +255,13 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
}

c.baseLogger.Debug("Wait for cache sync")
if !cache.WaitForCacheSync(ctx.Done(), c.gameServerSetSynced, c.fleetSynced) {
if !cache.WaitForCacheSync(ctx.Done(), c.gameServerSynced, c.gameServerSetSynced, c.fleetSynced) {
return errors.New("failed to wait for caches to sync")
}

c.workerqueue.Run(ctx, workers)

c.flushAllocations()
return nil
}

Expand Down Expand Up @@ -723,8 +759,18 @@ func (c *Controller) updateFleetStatus(ctx context.Context, fleet *agonesv1.Flee
}
}

allocs := c.allocs.get(fleet.ObjectMeta.Namespace, fleet.ObjectMeta.Name)
fCopy.Status.Allocations += allocs

_, err = c.fleetGetter.Fleets(fCopy.ObjectMeta.Namespace).UpdateStatus(ctx, fCopy, metav1.UpdateOptions{})
return errors.Wrapf(err, "error updating status of fleet %s", fCopy.ObjectMeta.Name)
if err != nil {
return errors.Wrapf(err, "error updating status of fleet %s", fCopy.ObjectMeta.Name)
}

// The update was successful, the allocation count must be decremented to reflect this.
c.allocs.dec(fleet.ObjectMeta.Namespace, fleet.ObjectMeta.Name, allocs)

return nil
}

// filterGameServerSetByActive returns the active GameServerSet (or nil if it
Expand Down Expand Up @@ -760,6 +806,107 @@ func (c *Controller) filterGameServerSetByActive(fleet *agonesv1.Fleet, list []*
return active, rest
}

func (c *Controller) flushAllocations() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

c.allocs.forEach(func(ns, fleetName string, count int64) {
fCopy, err := c.fleetGetter.Fleets(ns).Get(ctx, fleetName, metav1.GetOptions{})
if err != nil {
return
}

fCopy.Status.Allocations += count

_, _ = c.fleetGetter.Fleets(ns).UpdateStatus(ctx, fCopy, metav1.UpdateOptions{})
})
}

type allocTracker struct {
counts map[string]*atomic.Int64
mu sync.RWMutex
}

func newAllocTracker() *allocTracker {
return &allocTracker{counts: map[string]*atomic.Int64{}}
}

func (t *allocTracker) get(ns, fleetName string) int64 {
key := cache.ObjectName{Namespace: ns, Name: fleetName}.String()

t.mu.RLock()
defer t.mu.RUnlock()

counts, ok := t.counts[key]
if !ok {
return 0
}
return counts.Load()
}

func (t *allocTracker) inc(ns, fleetName string, v int64) {
key := cache.ObjectName{Namespace: ns, Name: fleetName}.String()

t.mu.RLock()
count, ok := t.counts[key]
if ok {
count.Add(v)
t.mu.RUnlock()
return
}
t.mu.RUnlock()

t.mu.Lock()
defer t.mu.Unlock()

count, ok = t.counts[key]
if !ok {
count = &atomic.Int64{}
t.counts[key] = count
}
count.Add(v)
}

func (t *allocTracker) dec(ns, fleetName string, v int64) {
key := cache.ObjectName{Namespace: ns, Name: fleetName}.String()
v = -1 * v

t.mu.RLock()
defer t.mu.RUnlock()

// We do not decrement something that does not exist. This would
// lead to some fairly confusing results.
count, ok := t.counts[key]
if ok {
count.Add(v)
}
}

func (t *allocTracker) remove(ns, fleetName string) {
key := cache.ObjectName{Namespace: ns, Name: fleetName}.String()

t.mu.Lock()
defer t.mu.Unlock()

delete(t.counts, key)
}

func (t *allocTracker) forEach(fn func(ns, fleetName string, count int64)) {
t.mu.Lock()
counts := make(map[string]*atomic.Int64, len(t.counts))
maps.Copy(counts, t.counts)
t.mu.Unlock()

for key, count := range counts {
if count.Load() == 0 {
continue
}

ns, fleetName, _ := cache.SplitMetaNamespaceKey(key)
fn(ns, fleetName, count.Load())
}
}

// mergeCounters adds the contents of AggregatedCounterStatus c2 into c1.
func mergeCounters(c1, c2 map[string]agonesv1.AggregatedCounterStatus) map[string]agonesv1.AggregatedCounterStatus {
if c1 == nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/fleets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,37 @@ func TestControllerSyncFleet(t *testing.T) {
agtesting.AssertNoEvent(t, m.FakeRecorder.Events)
})

t.Run("fleets update allocation counter", func(t *testing.T) {
f := defaultFixture()
f.Spec.Strategy.Type = appsv1.RollingUpdateDeploymentStrategyType

c, m := newFakeController()
gsSet := f.GameServerSet()
gsSet.ObjectMeta.Name = "gsSet1"
gsSet.ObjectMeta.UID = "4321"
gsSet.Spec.Replicas = f.Spec.Replicas

m.AgonesClient.AddReactor("list", "fleets", func(_ k8stesting.Action) (bool, runtime.Object, error) {
return true, &agonesv1.FleetList{Items: []agonesv1.Fleet{*f}}, nil
})
m.AgonesClient.AddReactor("get", "fleets", func(_ k8stesting.Action) (bool, runtime.Object, error) {
return true, f, nil
})

m.AgonesClient.AddReactor("list", "gameserversets", func(_ k8stesting.Action) (bool, runtime.Object, error) {
return true, &agonesv1.GameServerSetList{Items: []agonesv1.GameServerSet{*gsSet}}, nil
})

ctx, cancel := agtesting.StartInformers(m, c.fleetSynced, c.gameServerSetSynced)
defer cancel()

c.allocs.inc("default", "fleet-1", 1)

err := c.syncFleet(ctx, "default/fleet-1")
assert.Nil(t, err)
assert.Equal(t, int64(1), f.Status.Allocations)
})

t.Run("error on getting fleet", func(t *testing.T) {
c, _ := newFakeController()
c.fleetLister = &fakeFleetListerWithErr{}
Expand Down
3 changes: 3 additions & 0 deletions pkg/metrics/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ func (c *Controller) recordGameServerStatusChanges(old, next interface{}) {
if newGs.Status.State != oldGs.Status.State {
RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(keyType, string(newGs.Status.State)),
tag.Upsert(keyFleetName, fleetName), tag.Upsert(keyNamespace, newGs.GetNamespace())}, gameServerTotalStats.M(1))
if newGs.Status.State == agonesv1.GameServerStateAllocated {
RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(keyFleetName, fleetName), tag.Upsert(keyNamespace, newGs.GetNamespace())}, gameServerAllocationsTotalStats.M(1))
}

// Calculate the duration of the current state
duration, err := c.calcDuration(oldGs, newGs)
Expand Down
45 changes: 27 additions & 18 deletions pkg/metrics/controller_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
fleetListsName = "fleet_lists"
gameServersCountName = "gameservers_count"
gameServersTotalName = "gameservers_total"
gameServersAllocationsTotalName = "gameservers_allocations_total"
gameServersPlayerConnectedTotalName = "gameserver_player_connected_total"
gameServersPlayerCapacityTotalName = "gameserver_player_capacity_total"
nodeCountName = "nodes_count"
Expand All @@ -47,24 +48,25 @@ var (
// fleetViews are metric views associated with Fleets
fleetViews = append([]string{fleetRolloutPercent, fleetReplicaCountName, gameServersCountName, gameServersTotalName, gameServersPlayerConnectedTotalName, gameServersPlayerCapacityTotalName, gameServerStateDurationName, fleetCountersName, fleetListsName}, fleetAutoscalerViews...)

stateDurationSeconds = []float64{0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384}
fleetRolloutPercentStats = stats.Int64("fleets/rollout_percent", "The current fleet rollout percentage", "1")
fleetsReplicasCountStats = stats.Int64("fleets/replicas_count", "The count of replicas per fleet", "1")
fasBufferLimitsCountStats = stats.Int64("fas/buffer_limits", "The buffer limits of autoscalers", "1")
fasBufferSizeStats = stats.Int64("fas/buffer_size", "The buffer size value of autoscalers", "1")
fasCurrentReplicasStats = stats.Int64("fas/current_replicas_count", "The current replicas cout as seen by autoscalers", "1")
fasDesiredReplicasStats = stats.Int64("fas/desired_replicas_count", "The desired replicas cout as seen by autoscalers", "1")
fasAbleToScaleStats = stats.Int64("fas/able_to_scale", "The fleet autoscaler can access the fleet to scale (0 indicates false, 1 indicates true)", "1")
fasLimitedStats = stats.Int64("fas/limited", "The fleet autoscaler is capped (0 indicates false, 1 indicates true)", "1")
fleetCountersStats = stats.Int64("fleets/counters", "Aggregated Counters counts and capacity across GameServers in the Fleet", "1")
fleetListsStats = stats.Int64("fleets/lists", "Aggregated Lists counts and capacity across GameServers in the Fleet", "1")
gameServerCountStats = stats.Int64("gameservers/count", "The count of gameservers", "1")
gameServerTotalStats = stats.Int64("gameservers/total", "The total of gameservers", "1")
gameServerPlayerConnectedTotal = stats.Int64("gameservers/player_connected", "The total number of players connected to gameservers", "1")
gameServerPlayerCapacityTotal = stats.Int64("gameservers/player_capacity", "The available player capacity for gameservers", "1")
nodesCountStats = stats.Int64("nodes/count", "The count of nodes in the cluster", "1")
gsPerNodesCountStats = stats.Int64("gameservers_node/count", "The count of gameservers per node in the cluster", "1")
gsStateDurationSec = stats.Float64("gameservers_state/duration", "The duration of gameservers to be in a particular state", stats.UnitSeconds)
stateDurationSeconds = []float64{0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384}
fleetRolloutPercentStats = stats.Int64("fleets/rollout_percent", "The current fleet rollout percentage", "1")
fleetsReplicasCountStats = stats.Int64("fleets/replicas_count", "The count of replicas per fleet", "1")
fasBufferLimitsCountStats = stats.Int64("fas/buffer_limits", "The buffer limits of autoscalers", "1")
fasBufferSizeStats = stats.Int64("fas/buffer_size", "The buffer size value of autoscalers", "1")
fasCurrentReplicasStats = stats.Int64("fas/current_replicas_count", "The current replicas cout as seen by autoscalers", "1")
fasDesiredReplicasStats = stats.Int64("fas/desired_replicas_count", "The desired replicas cout as seen by autoscalers", "1")
fasAbleToScaleStats = stats.Int64("fas/able_to_scale", "The fleet autoscaler can access the fleet to scale (0 indicates false, 1 indicates true)", "1")
fasLimitedStats = stats.Int64("fas/limited", "The fleet autoscaler is capped (0 indicates false, 1 indicates true)", "1")
fleetCountersStats = stats.Int64("fleets/counters", "Aggregated Counters counts and capacity across GameServers in the Fleet", "1")
fleetListsStats = stats.Int64("fleets/lists", "Aggregated Lists counts and capacity across GameServers in the Fleet", "1")
gameServerCountStats = stats.Int64("gameservers/count", "The count of gameservers", "1")
gameServerTotalStats = stats.Int64("gameservers/total", "The total of gameservers", "1")
gameServerAllocationsTotalStats = stats.Int64("gameservers/allocations_total", "The total of gameserver allocations", "1")
gameServerPlayerConnectedTotal = stats.Int64("gameservers/player_connected", "The total number of players connected to gameservers", "1")
gameServerPlayerCapacityTotal = stats.Int64("gameservers/player_capacity", "The available player capacity for gameservers", "1")
nodesCountStats = stats.Int64("nodes/count", "The count of nodes in the cluster", "1")
gsPerNodesCountStats = stats.Int64("gameservers_node/count", "The count of gameservers per node in the cluster", "1")
gsStateDurationSec = stats.Float64("gameservers_state/duration", "The duration of gameservers to be in a particular state", stats.UnitSeconds)

stateViews = []*view.View{
{
Expand Down Expand Up @@ -151,6 +153,13 @@ var (
Aggregation: view.Count(),
TagKeys: []tag.Key{keyType, keyFleetName, keyNamespace},
},
{
Name: gameServersAllocationsTotalName,
Measure: gameServerAllocationsTotalStats,
Description: "The total of gameserver allocations",
Aggregation: view.Count(),
TagKeys: []tag.Key{keyFleetName, keyNamespace},
},
{
Name: gameServersPlayerConnectedTotalName,
Measure: gameServerPlayerConnectedTotal,
Expand Down
Loading
Loading