diff --git a/README.md b/README.md index 6fad3be..659f755 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/newcloudtechnologies/memlimiter.svg)](https://pkg.go.dev/github.com/newcloudtechnologies/memlimiter) [![Go Report Card](https://goreportcard.com/badge/github.com/newcloudtechnologies/memlimiter)](https://goreportcard.com/report/github.com/newcloudtechnologies/memlimiter) -![Coverage](https://img.shields.io/badge/Coverage-81.7%25-brightgreen) +![Coverage](https://img.shields.io/badge/Coverage-82.8%25-brightgreen) ![CI](https://github.com/newcloudtechnologies/memlimiter/actions/workflows/CI.yml/badge.svg) `memlimiter` helps a Go service avoid OOM by combining adaptive GC tuning and request throttling under memory pressure. @@ -10,6 +10,7 @@ It observes process memory (`RSS`) and Go heap pressure (`runtime.MemStats.NextGC`) and turns that into: - dynamic `debug.SetGCPercent` tuning, +- optional `debug.SetMemoryLimit` application on service start, - request shedding / backpressure via middleware. By default, stats come from: @@ -62,7 +63,9 @@ where: - $RSS_{limit}$ is a hard limit for service's physical memory (`RSS`) consumption (so that exceeding this limit will highly likely result in OOM); - $CGO$ is a total size of heap allocations made beyond `Cgo` borders (within `C`/`C++`/.... libraries). -A few notes about $CGO$ component. Allocations made outside of the Go allocator, of course, are not controlled by the Go runtime in any way. At the same time, the memory consumption limit is common for both Go and non-Go allocators. Therefore, if non-Go allocations grow, all we can do is shrink the memory budget for Go allocations (which is why we subtract $CGO$ from the denominator of the previous expression). If your service uses `Cgo`, you need to figure out how much memory is allocated “on the other side” – **otherwise MemLimiter won’t be able to save your service from OOM**. +A few notes about $CGO$ component. Allocations made outside of the Go allocator, of course, are not controlled by the Go runtime in any way. At the same time, the memory consumption limit is common for both Go and non-Go allocators. Therefore, if non-Go allocations grow, all we can do is shrink the memory budget for Go allocations (which is why we subtract $CGO$ from the denominator of the previous expression). If your service uses `Cgo`, you need to figure out how much memory is allocated "on the other side" - **otherwise MemLimiter won't be able to save your service from OOM**. + +When reported `$CGO >= RSS_{limit}$`, MemLimiter treats Go budget as exhausted and immediately switches to conservative control mode. If the service doesn't use `Cgo`, the $Utilization$ formula is simplified to: $$Utilization = \frac {NextGC} {RSS_{limit}}$$ @@ -80,29 +83,29 @@ You can adjust the proportional component control signal strength using a coeffi The control signal is always saturated to prevent extremal values: $$ Output = \begin{cases} -\displaystyle 100 \ \ \ K_{p} \gt 100 \\ -\displaystyle 0 \ \ \ \ \ \ \ K_{p} \lt 100 \\ +\displaystyle 99 \ \ \ K_{p} \gt 99 \\ +\displaystyle 0 \ \ \ \ \ \ \ K_{p} \lt 0 \\ \displaystyle K_{p} \ \ \ \ otherwise \\ \end{cases}$$ Finally we convert the dimensionless quantity $Output$ into specific $GOGC$ (for the further use in [`debug.SetGCPercent`](https://pkg.go.dev/runtime/debug#SetGCPercent)) and $Throttling$ (percentage of suppressed requests) values, however, only if the $Utilization$ exceeds the specified limits: -$$ GC = \begin{cases} -\displaystyle Output \ \ \ Utilization \gt DangerZoneGC \\ -\displaystyle 100 \ \ \ \ \ \ \ \ \ \ otherwise \\ +$$ GOGC = \begin{cases} +\displaystyle max(MinGOGC, 100 - round(Output)) \ \ \ Utilization \ge DangerZoneGOGC \\ +\displaystyle 100 \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ otherwise \\ \end{cases}$$ $$ Throttling = \begin{cases} -\displaystyle Output \ \ \ Utilization \gt DangerZoneThrottling \\ -\displaystyle 0 \ \ \ \ \ \ \ \ \ \ \ \ \ \ otherwise \\ +\displaystyle round(Output) \ \ \ Utilization \ge DangerZoneThrottling \\ +\displaystyle 0 \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ \ otherwise \\ \end{cases}$$ ## Architecture The MemLimiter comprises two main parts: -1. **Core** implementing the memory budget controller and backpressure subsystems. Core relies on actual statistics provided by `stats.ServiceStatsSubscription`. In a critical situation, core may gracefully terminate the application with `utils.ApplicationTerminator`. -2. **Middleware** providing request throttling feature for various web frameworks. Every time the server receives a request, it uses middleware to ask the MemLimiter’s core for permission to process this request. Currently, only `GRPC` is supported, but `Middleware` is an easily extensible interface, and PRs are welcome. +1. **Core** implementing the memory budget controller and backpressure subsystems. Core relies on actual statistics provided by `stats.ServiceStatsSubscription`. +2. **Middleware** providing request throttling feature for various web frameworks. Every time the server receives a request, it uses middleware to ask the MemLimiter's core for permission to process this request. Currently, only `gRPC` is supported, but `Middleware` is an easily extensible interface, and PRs are welcome. ![Architecture](docs/architecture.png) @@ -122,44 +125,110 @@ You must also provide your own `stats.ServiceStatsSubscription` and `stats.Servi ### Tuning -There are several key settings in MemLimiter [configuration](controller/nextgc/config.go): - -- `RSSLimit` -- `DangerZoneGC` -- `DangerZoneThrottling` -- `Period` -- `WindowSize` -- `Coefficient` ($C_{p}$) +There are several key settings in MemLimiter configuration (see [top-level config](config.go) and [controller config](controller/nextgc/config.go)): + +- `go_memory_limit` (optional, top-level) +- `controller_nextgc.rss_limit` +- `controller_nextgc.danger_zone_gogc` +- `controller_nextgc.danger_zone_throttling` +- `controller_nextgc.min_gogc` +- `controller_nextgc.period` +- `controller_nextgc.component_proportional.window_size` +- `controller_nextgc.component_proportional.coefficient` ($C_{p}$) + +Example: + +```json +{ + "go_memory_limit": "800M", + "controller_nextgc": { + "rss_limit": "1G", + "danger_zone_gogc": 50, + "danger_zone_throttling": 90, + "min_gogc": 10, + "period": "100ms", + "component_proportional": { + "coefficient": 1, + "window_size": 20 + } + } +} +``` You have to pick them empirically for your service. The settings must correspond to the business logic features of a particular service and to the workload expected. -We made a series of performance tests with [Allocator][test/allocator] - an example service which does nothing but allocations that reside in memory for some time. We used different settings, applied the same load and tracked the RSS of a process. +We made a series of performance tests with [Allocator](test/allocator) - an example service which does nothing but allocations that reside in memory for some time. We used different settings, applied the same load and tracked runtime behavior. -Settings ranges: +Current `make allocator-analyze` scenario matrix: +- One unlimited baseline (`memlimiter` disabled). +- One limited baseline without Go soft limit (`go_memory_limit = 0`). +- Several limited cases with `go_memory_limit = 800MiB`, including a stricter safety floor (`min_gogc = 30`) case. + +Common settings in this matrix: - $RSS_{limit} = {1G}$ -- $DangerZoneGC = 50%$ -- $DangerZoneThrottling = 90%$ +- $DangerZoneGOGC = 50\%$ +- $DangerZoneThrottling = 90\%$ - $Period = 100ms$ - $WindowSize = 20$ -- $C_{p} \in \\{0, 0.5, 1, 5, 10, 50, 100\\}$ -These plots may give you some inspiration on how $C_{p}$ value affects the physical memory consumption other things being equal: +Scenario-specific values: +- $go\_memory\_limit \in \{0, 800MiB\}$ +- $MinGOGC \in \{10, 30\}$ +- $C_{p} \in \{0.5, 5, 10, 50\}$ + +Load profile (same for all scenarios): +- $RPS = 120$ +- $AllocationSize = 1MiB$ +- $PauseDuration = 6s$ +- $RequestTimeout = 1m$ +- $LoadDuration = 60s$ + +Current analyzer run outputs are generated under `/tmp/allocator/allocator_/` (images below are curated examples from `docs/`): ![Control params](docs/control_params.png) -And the summary plot with RSS consumption dependence on $C_{p}$ value: +And the summary RSS plot across tested scenarios: + +![RSS](docs/rss.png) + +Observed OOM behavior in this run: +- Without MemLimiter (`unlimited=true`), the process terminates around ~16s under the 1GiB container limit. +- With MemLimiter enabled, all limited scenarios sustain the full 60s load window. + +Additional plots for new controls (`go_memory_limit` and `min_gogc`) are generated by `make allocator-analyze` in the same run directory. Curated examples are stored under `docs/`: + +`gogc_floor_hits.png`: + +![GOGC floor hits](docs/gogc_floor_hits.png) + +What it means: +- It shows, per scenario, the share of samples where `GOGC` is clamped by `min_gogc`. +- Higher values mean the safety floor is actively protecting the process from dropping to overly aggressive GC values. +- In this run, the strict case (`C_p=50`, `min_gogc=30`) hits the floor for ~78% of samples. + +`memory_limits_overlay.png`: + +![Memory limits overlay](docs/memory_limits_overlay.png) + +What it means: +- It shows `RSS` and `Go runtime memory` (tracked as `MemStats.Sys - MemStats.HeapReleased`) with configured limits over time. +- `go_memory_limit` is a soft limit, so short-term overshoot is possible under bursty/high-allocation load. +- If overshoot is large and persistent, allocation pressure is stronger than GC control for this workload. +- If `RSS` stays high while `Go runtime memory` is low, pressure likely comes from non-Go allocations (`Cgo`/external memory), so better external accounting and/or stronger throttling is needed. -![RSS](docs/rss_hl.png) +General observations from these experiments: +- In the latest stress run, disabling MemLimiter (`unlimited` baseline) terminates around 16s under the 1GiB container limit, while limited scenarios complete the full 60s load. +- `go_memory_limit=800MiB` adds extra GC pressure as a soft target; in this stress test it is not a hard ceiling for `Go runtime memory`. +- `min_gogc` protects against extreme GC aggressiveness by clamping controller output in red-zone periods. +- A stricter floor (`min_gogc=30`) with aggressive `C_p=50` shifts control toward stronger throttling (up to 99%) instead of further GC tightening. -The general conclusion is that: -- The higher the $C_{p}$ is, the lower the $RSS$ consumption. -- Too low and too high $C_{p}$ values cause self-oscillation of control parameters. -- Disabling MemLimiter causes OOM. +Runtime settings changed by MemLimiter are restored on `Service.Quit()`: +- `GOGC` (`debug.SetGCPercent`) +- `go_memory_limit` (if configured via `debug.SetMemoryLimit`) ## TODO - Extend middleware.Middleware to support more frameworks. -- Add GOGC limitations to prevent death spirals. - Support popular Cgo allocators like Jemalloc or TCMalloc, parse their stats to provide information about Cgo memory consumption. Your PRs are welcome! diff --git a/backpressure/interface.go b/backpressure/interface.go index dd10556..ae6f209 100644 --- a/backpressure/interface.go +++ b/backpressure/interface.go @@ -28,4 +28,6 @@ type Operator interface { AllowRequest() bool // GetStats returns statistics of Backpressure subsystem. GetStats() (*stats.BackpressureStats, error) + // Quit gracefully terminates backpressure subsystem and restores runtime settings. + Quit() } diff --git a/backpressure/mock.go b/backpressure/mock.go index 2d611ac..6ca834f 100644 --- a/backpressure/mock.go +++ b/backpressure/mock.go @@ -23,3 +23,25 @@ func (m *OperatorMock) SetControlParameters(value *stats.ControlParameters) erro return args.Error(0) } + +func (m *OperatorMock) AllowRequest() bool { + args := m.Called() + + return args.Bool(0) +} + +func (m *OperatorMock) GetStats() (*stats.BackpressureStats, error) { + args := m.Called() + + raw := args.Get(0) + if raw == nil { + return nil, args.Error(1) + } + + //nolint:forcetypeassert // Mocked method. + return raw.(*stats.BackpressureStats), args.Error(1) +} + +func (m *OperatorMock) Quit() { + m.Called() +} diff --git a/backpressure/operator.go b/backpressure/operator.go index 0b0085c..fa74e51 100644 --- a/backpressure/operator.go +++ b/backpressure/operator.go @@ -23,6 +23,8 @@ type operatorImpl struct { notificationChan chan<- *stats.MemLimiterStats lastControlParameters atomic.Value + initialGOGC atomic.Int64 + initialGOGCStored atomic.Bool logger logr.Logger } @@ -85,7 +87,10 @@ func (b *operatorImpl) SetControlParameters(value *stats.ControlParameters) erro } // Tune GC pace. - debug.SetGCPercent(value.GOGC) + oldGOGC := debug.SetGCPercent(value.GOGC) + if b.initialGOGCStored.CompareAndSwap(false, true) { + b.initialGOGC.Store(int64(oldGOGC)) + } b.logger.Info("control parameters changed", value.ToKeysAndValues()...) @@ -110,3 +115,10 @@ func (b *operatorImpl) SetControlParameters(value *stats.ControlParameters) erro return nil } + +// Quit gracefully terminates backpressure subsystem. +func (b *operatorImpl) Quit() { + if b.initialGOGCStored.Load() { + debug.SetGCPercent(int(b.initialGOGC.Load())) + } +} diff --git a/backpressure/operator_test.go b/backpressure/operator_test.go index f9bd216..b39a70f 100644 --- a/backpressure/operator_test.go +++ b/backpressure/operator_test.go @@ -7,6 +7,7 @@ package backpressure import ( + "runtime/debug" "testing" "github.com/go-logr/logr/testr" @@ -32,3 +33,24 @@ func TestOperator(t *testing.T) { require.Equal(t, params, notification.Backpressure.ControlParameters) } + +func TestOperatorQuitRestoresGOGC(t *testing.T) { + const expectedInitialGOGC = 73 + + originalBeforeTest := debug.SetGCPercent(expectedInitialGOGC) + defer debug.SetGCPercent(originalBeforeTest) + + logger := testr.New(t) + op := NewOperator(logger) + + err := op.SetControlParameters(&stats.ControlParameters{ + GOGC: 21, + ThrottlingPercentage: NoThrottling, + }) + require.NoError(t, err) + + op.Quit() + + prev := debug.SetGCPercent(expectedInitialGOGC) + require.Equal(t, expectedInitialGOGC, prev) +} diff --git a/config.go b/config.go index d2a03b8..56a7e96 100644 --- a/config.go +++ b/config.go @@ -8,12 +8,17 @@ package memlimiter import ( "errors" + "math" "github.com/newcloudtechnologies/memlimiter/controller/nextgc" + "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" ) // Config - high-level MemLimiter config. type Config struct { + // GoMemoryLimit optionally sets Go runtime soft memory limit via debug.SetMemoryLimit. + // Zero means disabled. + GoMemoryLimit bytes.Bytes `json:"go_memory_limit"` // ControllerNextGC - NextGC-based controller ControllerNextGC *nextgc.ControllerConfig `json:"controller_nextgc"` //nolint:tagliatelle // TODO: @@ -32,5 +37,9 @@ func (c *Config) Prepare() error { return errors.New("empty ControllerNextGC") } + if c.GoMemoryLimit.Value > uint64(math.MaxInt64) { + return errors.New("GoMemoryLimit exceeds int64 range") + } + return nil } diff --git a/config_test.go b/config_test.go index 970b85a..2e5f4fc 100644 --- a/config_test.go +++ b/config_test.go @@ -7,8 +7,11 @@ package memlimiter import ( + "math" "testing" + "github.com/newcloudtechnologies/memlimiter/controller/nextgc" + "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" "github.com/stretchr/testify/require" ) @@ -22,4 +25,20 @@ func TestConfig(t *testing.T) { c := &Config{ControllerNextGC: nil} require.Error(t, c.Prepare()) }) + + t.Run("go memory limit in range", func(t *testing.T) { + c := &Config{ + ControllerNextGC: &nextgc.ControllerConfig{}, + GoMemoryLimit: bytes.Bytes{Value: uint64(math.MaxInt64)}, + } + require.NoError(t, c.Prepare()) + }) + + t.Run("go memory limit out of range", func(t *testing.T) { + c := &Config{ + ControllerNextGC: &nextgc.ControllerConfig{}, + GoMemoryLimit: bytes.Bytes{Value: uint64(math.MaxInt64) + 1}, + } + require.Error(t, c.Prepare()) + }) } diff --git a/controller/nextgc/budget_test.go b/controller/nextgc/budget_test.go new file mode 100644 index 0000000..a4c7404 --- /dev/null +++ b/controller/nextgc/budget_test.go @@ -0,0 +1,68 @@ +/* + * Copyright (c) New Cloud Technologies, Ltd. 2013-2022. + * Author: Vitaly Isaev + * License: https://github.com/newcloudtechnologies/memlimiter/blob/master/LICENSE + */ + +package nextgc + +import ( + "testing" + + configbytes "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" + "github.com/stretchr/testify/require" +) + +func TestComputeGoAllocLimit(t *testing.T) { + tests := []struct { + name string + rssLimit uint64 + cgoAllocs uint64 + expected uint64 + expectedOkay bool + }{ + { + name: "regular budget", + rssLimit: 1000, + cgoAllocs: 100, + expected: 900, + expectedOkay: true, + }, + { + name: "budget exhausted exactly", + rssLimit: 1000, + cgoAllocs: 1000, + expected: 1, + expectedOkay: false, + }, + { + name: "budget exhausted overflow risk", + rssLimit: 1000, + cgoAllocs: 1200, + expected: 1, + expectedOkay: false, + }, + { + name: "empty rss limit", + rssLimit: 0, + cgoAllocs: 0, + expected: 0, + expectedOkay: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &controllerImpl{ + cfg: &ControllerConfig{ + RSSLimit: configbytes.Bytes{Value: tt.rssLimit}, + }, + } + + actual, ok := c.computeGoAllocLimit(tt.cgoAllocs) + + require.Equal(t, tt.expected, actual) + require.Equal(t, tt.expectedOkay, ok) + }) + } +} diff --git a/controller/nextgc/component_p.go b/controller/nextgc/component_p.go index b01be5f..8a09b12 100644 --- a/controller/nextgc/component_p.go +++ b/controller/nextgc/component_p.go @@ -72,6 +72,10 @@ func (c *componentP) value(utilization float64) (float64, error) { // valueRaw returns the raw proportional component's output. func (c *componentP) valueRaw(utilization float64) (float64, error) { + if math.IsNaN(utilization) { + return math.NaN(), fmt.Errorf("value is undefined if memory usage = %v", utilization) + } + if utilization < 0 { return math.NaN(), fmt.Errorf("value is undefined if memory usage = %v", utilization) } diff --git a/controller/nextgc/component_p_test.go b/controller/nextgc/component_p_test.go index 9fde4b5..1f8cfaf 100644 --- a/controller/nextgc/component_p_test.go +++ b/controller/nextgc/component_p_test.go @@ -7,6 +7,7 @@ package nextgc import ( + "math" "testing" "github.com/go-logr/logr/testr" @@ -15,11 +16,23 @@ import ( func TestComponentP(t *testing.T) { logger := testr.New(t) - cmp := &componentP{logger: logger} + cmp := newComponentP(logger, &ComponentProportionalConfig{ + Coefficient: 1, + }) _, err := cmp.value(-1) require.Error(t, err) + _, err = cmp.value(math.NaN()) + require.Error(t, err) + _, err = cmp.value(2) require.NoError(t, err) + + out, err := cmp.value(math.Inf(1)) + require.NoError(t, err) + require.InDelta(t, float64(100), out, 1e-9) + + _, err = cmp.value(math.Inf(-1)) + require.Error(t, err) } diff --git a/controller/nextgc/config.go b/controller/nextgc/config.go index a07d510..0b65777 100644 --- a/controller/nextgc/config.go +++ b/controller/nextgc/config.go @@ -9,10 +9,14 @@ package nextgc import ( "errors" + "github.com/newcloudtechnologies/memlimiter/backpressure" "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" "github.com/newcloudtechnologies/memlimiter/utils/config/duration" ) +// defaultMinGOGC is the default minimum GOGC value used in the "red zone". +const defaultMinGOGC = 10 + // ControllerConfig - controller configuration. type ControllerConfig struct { // RSSLimit - physical memory (RSS) consumption hard limit for a process. @@ -21,12 +25,17 @@ type ControllerConfig struct { // set more conservative parameters for GC. // Possible values are in range (0; 100). DangerZoneGOGC uint32 `json:"danger_zone_gogc"` - // DangerZoneGOGC - RSS utilization threshold that triggers controller to + // DangerZoneThrottling - RSS utilization threshold that triggers controller to // throttle incoming requests. // Possible values are in range (0; 100). + // It's recommended to keep it greater than or equal to DangerZoneGOGC so that + // the service first intensifies GC and starts throttling only later. DangerZoneThrottling uint32 `json:"danger_zone_throttling"` // Period - the periodicity of control parameters computation. Period duration.Duration `json:"period"` + // MinGOGC - minimal allowed GOGC value used in the "red zone". + // Zero means default safe value. + MinGOGC int `json:"min_gogc"` // ComponentProportional - controller's proportional component configuration ComponentProportional *ComponentProportionalConfig `json:"component_proportional"` // TODO: @@ -35,22 +44,82 @@ type ControllerConfig struct { // Prepare - config validator. func (c *ControllerConfig) Prepare() error { + if err := c.validateRSSLimit(); err != nil { + return err + } + + if err := c.validateDangerZoneGOGC(); err != nil { + return err + } + + if err := c.validateDangerZoneThrottling(); err != nil { + return err + } + + if err := c.validatePeriod(); err != nil { + return err + } + + c.applyDefaults() + + if err := c.validateMinGOGC(); err != nil { + return err + } + + if err := c.validateComponentProportional(); err != nil { + return err + } + + return nil +} + +func (c *ControllerConfig) validateRSSLimit() error { if c.RSSLimit.Value == 0 { return errors.New("empty RSSLimit") } - if c.DangerZoneGOGC == 0 || c.DangerZoneGOGC > 100 { - return errors.New("invalid DangerZoneGOGC value (must belong to [0; 100])") + return nil +} + +func (c *ControllerConfig) validateDangerZoneGOGC() error { + if c.DangerZoneGOGC == 0 || c.DangerZoneGOGC >= 100 { + return errors.New("invalid DangerZoneGOGC value (must belong to (0; 100))") } - if c.DangerZoneThrottling == 0 || c.DangerZoneThrottling > 100 { - return errors.New("invalid DangerZoneThrottling value (must belong to [0; 100])") + return nil +} + +func (c *ControllerConfig) validateDangerZoneThrottling() error { + if c.DangerZoneThrottling == 0 || c.DangerZoneThrottling >= 100 { + return errors.New("invalid DangerZoneThrottling value (must belong to (0; 100))") } + return nil +} + +func (c *ControllerConfig) validatePeriod() error { if c.Period.Duration == 0 { return errors.New("empty Period") } + return nil +} + +func (c *ControllerConfig) applyDefaults() { + if c.MinGOGC == 0 { + c.MinGOGC = defaultMinGOGC + } +} + +func (c *ControllerConfig) validateMinGOGC() error { + if c.MinGOGC < 1 || c.MinGOGC > backpressure.DefaultGOGC { + return errors.New("invalid MinGOGC value") + } + + return nil +} + +func (c *ControllerConfig) validateComponentProportional() error { if c.ComponentProportional == nil { return errors.New("empty ComponentProportional") } diff --git a/controller/nextgc/config_test.go b/controller/nextgc/config_test.go index 23040b4..2560bb0 100644 --- a/controller/nextgc/config_test.go +++ b/controller/nextgc/config_test.go @@ -9,6 +9,7 @@ package nextgc import ( "testing" + "github.com/newcloudtechnologies/memlimiter/backpressure" "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" "github.com/newcloudtechnologies/memlimiter/utils/config/duration" "github.com/stretchr/testify/require" @@ -28,6 +29,14 @@ func TestComponentConfig(t *testing.T) { require.Error(t, c.Prepare()) }) + t.Run("bad danger zone GOGC equal to 100", func(t *testing.T) { + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 100, + } + require.Error(t, c.Prepare()) + }) + t.Run("bad danger zone throttling", func(t *testing.T) { c := &ControllerConfig{ RSSLimit: bytes.Bytes{Value: 1}, @@ -37,6 +46,15 @@ func TestComponentConfig(t *testing.T) { require.Error(t, c.Prepare()) }) + t.Run("bad danger zone throttling equal to 100", func(t *testing.T) { + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 50, + DangerZoneThrottling: 100, + } + require.Error(t, c.Prepare()) + }) + t.Run("bad period", func(t *testing.T) { c := &ControllerConfig{ RSSLimit: bytes.Bytes{Value: 1}, @@ -56,6 +74,84 @@ func TestComponentConfig(t *testing.T) { } require.Error(t, c.Prepare()) }) + + t.Run("default MinGOGC", func(t *testing.T) { + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 50, + DangerZoneThrottling: 90, + Period: duration.Duration{Duration: 1}, + ComponentProportional: &ComponentProportionalConfig{ + Coefficient: 1, + }, + } + + require.NoError(t, c.Prepare()) + require.Equal(t, defaultMinGOGC, c.MinGOGC) + }) + + t.Run("invalid MinGOGC less than one", func(t *testing.T) { + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 50, + DangerZoneThrottling: 90, + Period: duration.Duration{Duration: 1}, + MinGOGC: -1, + ComponentProportional: &ComponentProportionalConfig{ + Coefficient: 1, + }, + } + + require.Error(t, c.Prepare()) + }) + + t.Run("invalid MinGOGC greater than default GOGC", func(t *testing.T) { + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 50, + DangerZoneThrottling: 90, + Period: duration.Duration{Duration: 1}, + MinGOGC: backpressure.DefaultGOGC + 1, + ComponentProportional: &ComponentProportionalConfig{ + Coefficient: 1, + }, + } + + require.Error(t, c.Prepare()) + }) + + t.Run("valid custom MinGOGC", func(t *testing.T) { + const customMinGOGC = 25 + + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 50, + DangerZoneThrottling: 90, + Period: duration.Duration{Duration: 1}, + MinGOGC: customMinGOGC, + ComponentProportional: &ComponentProportionalConfig{ + Coefficient: 1, + }, + } + + require.NoError(t, c.Prepare()) + require.Equal(t, customMinGOGC, c.MinGOGC) + }) + + t.Run("danger zone order is not strictly validated", func(t *testing.T) { + c := &ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1}, + DangerZoneGOGC: 90, + DangerZoneThrottling: 50, + Period: duration.Duration{Duration: 1}, + MinGOGC: 10, + ComponentProportional: &ComponentProportionalConfig{ + Coefficient: 1, + }, + } + + require.NoError(t, c.Prepare()) + }) } func TestComponentProportionalConfig(t *testing.T) { diff --git a/controller/nextgc/controller.go b/controller/nextgc/controller.go index aa5aff7..6632822 100644 --- a/controller/nextgc/controller.go +++ b/controller/nextgc/controller.go @@ -179,17 +179,43 @@ func (c *controllerImpl) updateUtilization(serviceStats stats.ServiceStats) { } } - c.goAllocLimit = c.cfg.RSSLimit.Value - cgoAllocs + goAllocLimit, budgetOK := c.computeGoAllocLimit(cgoAllocs) + c.goAllocLimit = goAllocLimit // Memory utilization is defined as the relation of NextGC value to the Go allocation limit. // If NextGC becomes higher than the allocation limit, the GC will never run, because // OOM will happen first. That's why we need to push away Go process from the allocation limit. + if !budgetOK { + // If non-Go allocations already exhausted the RSS budget, force controller to + // apply conservative parameters without producing infinities/NaN in stats output. + c.utilization = exhaustedBudgetUtilization + c.rss = serviceStats.RSS() + + return + } + c.utilization = float64(serviceStats.NextGC()) / float64(c.goAllocLimit) // Just for the history, save actual RSS value c.rss = serviceStats.RSS() } +// computeGoAllocLimit computes Go allocations budget from total RSS limit and cgo consumption. +// bool result indicates whether the resulting budget is valid and non-exhausted. +func (c *controllerImpl) computeGoAllocLimit(cgoAllocs uint64) (uint64, bool) { + rssLimit := c.cfg.RSSLimit.Value + + if rssLimit == 0 { + return 0, false + } + + if cgoAllocs >= rssLimit { + return 1, false + } + + return rssLimit - cgoAllocs, true +} + // updateControlValues updates the controller control values. func (c *controllerImpl) updateControlValues() error { var err error @@ -225,7 +251,14 @@ func (c *controllerImpl) updateControlParameters() { c.controlParameters.ControllerStats = c.aggregateStats() } -const percents = 100 +const ( + // percents is a constant for converting float64 to uint32. + percents = 100 + // exhaustedBudgetUtilization is a finite marker value greater than 1 used + // when cgo allocations fully consume RSS budget. + // This avoids Inf/NaN in stats output and guarantees "red zone" behavior. + exhaustedBudgetUtilization = 1.01 +) // updateControlParameterGOGC updates the controller control parameter GOGC. func (c *controllerImpl) updateControlParameterGOGC() { @@ -238,7 +271,18 @@ func (c *controllerImpl) updateControlParameterGOGC() { // Control parameters are more conservative in the "red zone". roundedValue := uint32(math.Round(c.sumValue)) - c.controlParameters.GOGC = int(backpressure.DefaultGOGC - roundedValue) + gogc := int(backpressure.DefaultGOGC - roundedValue) + + minGOGC := c.cfg.MinGOGC + if minGOGC == 0 { + minGOGC = defaultMinGOGC + } + + if gogc < minGOGC { + gogc = minGOGC + } + + c.controlParameters.GOGC = gogc } // updateControlParameterThrottling updates the controller control parameter throttling. diff --git a/controller/nextgc/controller_gogc_test.go b/controller/nextgc/controller_gogc_test.go new file mode 100644 index 0000000..e11a24e --- /dev/null +++ b/controller/nextgc/controller_gogc_test.go @@ -0,0 +1,81 @@ +/* + * Copyright (c) New Cloud Technologies, Ltd. 2013-2022. + * Author: Vitaly Isaev + * License: https://github.com/newcloudtechnologies/memlimiter/blob/master/LICENSE + */ + +package nextgc + +import ( + "testing" + + "github.com/newcloudtechnologies/memlimiter/backpressure" + "github.com/newcloudtechnologies/memlimiter/stats" + "github.com/stretchr/testify/require" +) + +func TestUpdateControlParameterGOGC(t *testing.T) { + t.Run("clamp to custom MinGOGC", func(t *testing.T) { + c := &controllerImpl{ + sumValue: 99, + utilization: 0.9, + cfg: &ControllerConfig{ + DangerZoneGOGC: 50, + MinGOGC: 10, + }, + controlParameters: &stats.ControlParameters{}, + } + + c.updateControlParameterGOGC() + + require.Equal(t, 10, c.controlParameters.GOGC) + }) + + t.Run("clamp to default MinGOGC when MinGOGC is zero", func(t *testing.T) { + c := &controllerImpl{ + sumValue: 99, + utilization: 0.9, + cfg: &ControllerConfig{ + DangerZoneGOGC: 50, + MinGOGC: 0, + }, + controlParameters: &stats.ControlParameters{}, + } + + c.updateControlParameterGOGC() + + require.Equal(t, defaultMinGOGC, c.controlParameters.GOGC) + }) + + t.Run("green zone keeps default GOGC", func(t *testing.T) { + c := &controllerImpl{ + sumValue: 99, + utilization: 0.1, + cfg: &ControllerConfig{ + DangerZoneGOGC: 50, + MinGOGC: 10, + }, + controlParameters: &stats.ControlParameters{}, + } + + c.updateControlParameterGOGC() + + require.Equal(t, backpressure.DefaultGOGC, c.controlParameters.GOGC) + }) + + t.Run("value above MinGOGC is not clamped", func(t *testing.T) { + c := &controllerImpl{ + sumValue: 22, + utilization: 0.9, + cfg: &ControllerConfig{ + DangerZoneGOGC: 50, + MinGOGC: 10, + }, + controlParameters: &stats.ControlParameters{}, + } + + c.updateControlParameterGOGC() + + require.Equal(t, 78, c.controlParameters.GOGC) + }) +} diff --git a/docs/control_params.png b/docs/control_params.png index 35684aa..1124721 100644 Binary files a/docs/control_params.png and b/docs/control_params.png differ diff --git a/docs/gogc_floor_hits.png b/docs/gogc_floor_hits.png new file mode 100644 index 0000000..58bde94 Binary files /dev/null and b/docs/gogc_floor_hits.png differ diff --git a/docs/memory_limits_overlay.png b/docs/memory_limits_overlay.png new file mode 100644 index 0000000..359ea2e Binary files /dev/null and b/docs/memory_limits_overlay.png differ diff --git a/docs/rss.png b/docs/rss.png index 5ef8f6d..96c078c 100644 Binary files a/docs/rss.png and b/docs/rss.png differ diff --git a/make-workflows.md b/make-workflows.md index 3878d87..58f35fd 100644 --- a/make-workflows.md +++ b/make-workflows.md @@ -103,6 +103,8 @@ Expected output directory: Expected generated files: - `control_params.png` +- `gogc_floor_hits.png` +- `memory_limits_overlay.png` - `rss.png` - Per-case directories with: - `server_config.json` diff --git a/middleware/grpc.go b/middleware/grpc.go index 3e4075f..64d2f94 100644 --- a/middleware/grpc.go +++ b/middleware/grpc.go @@ -25,16 +25,21 @@ type GRPC interface { MakeStreamServerInterceptor() grpc.StreamServerInterceptor } +// grpcImpl is the implementation of the GRPC interface. type grpcImpl struct { backpressureOperator backpressure.Operator logger logr.Logger } +// unknownGRPCMethod is a constant for the unknown GRPC method. +const unknownGRPCMethod = "" + +// MakeUnaryServerInterceptor returns a unary server interceptor. func (g *grpcImpl) MakeUnaryServerInterceptor() grpc.UnaryServerInterceptor { return func( ctx context.Context, req any, - _ *grpc.UnaryServerInfo, + info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (any, error) { allowed := g.backpressureOperator.AllowRequest() @@ -47,17 +52,18 @@ func (g *grpcImpl) MakeUnaryServerInterceptor() grpc.UnaryServerInterceptor { logger = g.logger } - logger.Info("request has been throttled") + logger.Info("request has been throttled", "grpc_method", g.grpcMethodFromUnaryInfo(info)) return nil, status.Error(codes.ResourceExhausted, "request has been throttled") } } +// MakeStreamServerInterceptor returns a stream server interceptor. func (g *grpcImpl) MakeStreamServerInterceptor() grpc.StreamServerInterceptor { return func( srv any, ss grpc.ServerStream, - _ *grpc.StreamServerInfo, + info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { allowed := g.backpressureOperator.AllowRequest() @@ -70,8 +76,36 @@ func (g *grpcImpl) MakeStreamServerInterceptor() grpc.StreamServerInterceptor { logger = g.logger } - logger.Info("request has been throttled") + logger.Info("request has been throttled", "grpc_method", g.grpcMethodFromStreamInfo(info)) return status.Error(codes.ResourceExhausted, "request has been throttled") } } + +// grpcMethodFromUnaryInfo returns the GRPC method from the unary server info. +func (g *grpcImpl) grpcMethodFromUnaryInfo(info *grpc.UnaryServerInfo) string { + if info == nil { + return unknownGRPCMethod + } + + method := info.FullMethod + if method == "" { + return unknownGRPCMethod + } + + return method +} + +// grpcMethodFromStreamInfo returns the GRPC method from the stream server info. +func (g *grpcImpl) grpcMethodFromStreamInfo(info *grpc.StreamServerInfo) string { + if info == nil { + return unknownGRPCMethod + } + + method := info.FullMethod + if method == "" { + return unknownGRPCMethod + } + + return method +} diff --git a/middleware/grpc_test.go b/middleware/grpc_test.go new file mode 100644 index 0000000..2293105 --- /dev/null +++ b/middleware/grpc_test.go @@ -0,0 +1,169 @@ +/* + * Copyright (c) New Cloud Technologies, Ltd. 2013-2022. + * Author: Vitaly Isaev + * License: https://github.com/newcloudtechnologies/memlimiter/blob/master/LICENSE + */ + +package middleware + +import ( + "context" + "testing" + + "github.com/go-logr/logr" + "github.com/newcloudtechnologies/memlimiter/stats" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +type logRecord struct { + msg string + kv []any +} + +type captureSink struct { + record *logRecord + prefix []any +} + +var _ logr.LogSink = (*captureSink)(nil) + +func newCaptureSink() *captureSink { + return &captureSink{ + record: &logRecord{}, + } +} + +func (s *captureSink) Init(_ logr.RuntimeInfo) {} + +func (s *captureSink) Enabled(_ int) bool { return true } + +func (s *captureSink) Info(_ int, msg string, keysAndValues ...any) { + all := append([]any{}, s.prefix...) + all = append(all, keysAndValues...) + + s.record.msg = msg + s.record.kv = all +} + +func (s *captureSink) Error(_ error, _ string, _ ...any) {} + +func (s *captureSink) WithName(_ string) logr.LogSink { return s } + +func (s *captureSink) WithValues(keysAndValues ...any) logr.LogSink { + prefix := append([]any{}, s.prefix...) + prefix = append(prefix, keysAndValues...) + + return &captureSink{ + record: s.record, + prefix: prefix, + } +} + +type backpressureOperatorStub struct { + allow bool +} + +func (b *backpressureOperatorStub) SetControlParameters(_ *stats.ControlParameters) error { return nil } + +func (b *backpressureOperatorStub) AllowRequest() bool { return b.allow } + +func (b *backpressureOperatorStub) GetStats() (*stats.BackpressureStats, error) { + return &stats.BackpressureStats{}, nil +} + +func (b *backpressureOperatorStub) Quit() {} + +type serverStreamStub struct{} + +func (s *serverStreamStub) SetHeader(_ metadata.MD) error { return nil } + +func (s *serverStreamStub) SendHeader(_ metadata.MD) error { return nil } + +func (s *serverStreamStub) SetTrailer(_ metadata.MD) {} + +func (s *serverStreamStub) Context() context.Context { return context.Background() } + +func (s *serverStreamStub) SendMsg(_ any) error { return nil } + +func (s *serverStreamStub) RecvMsg(_ any) error { return nil } + +func TestUnaryServerInterceptorLogsMethodOnThrottling(t *testing.T) { + sink := newCaptureSink() + g := &grpcImpl{ + backpressureOperator: &backpressureOperatorStub{allow: false}, + logger: logr.New(sink), + } + + interceptor := g.MakeUnaryServerInterceptor() + + handlerCalled := false + _, err := interceptor( + context.Background(), + "struct{}{}", + &grpc.UnaryServerInfo{FullMethod: "/test.Service/Unary"}, + func(_ context.Context, _ any) (any, error) { + handlerCalled = true + + return "ok", nil + }, + ) + + require.False(t, handlerCalled) + require.Error(t, err) + + st, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.ResourceExhausted, st.Code()) + + method, ok := keyValueByName(sink.record.kv, "grpc_method") + require.True(t, ok) + require.Equal(t, "/test.Service/Unary", method) +} + +func TestStreamServerInterceptorLogsMethodOnThrottling(t *testing.T) { + sink := newCaptureSink() + g := &grpcImpl{ + backpressureOperator: &backpressureOperatorStub{allow: false}, + logger: logr.New(sink), + } + + interceptor := g.MakeStreamServerInterceptor() + + handlerCalled := false + err := interceptor( + struct{}{}, + &serverStreamStub{}, + &grpc.StreamServerInfo{FullMethod: "/test.Service/Stream"}, + func(_ any, _ grpc.ServerStream) error { + handlerCalled = true + + return nil + }, + ) + + require.False(t, handlerCalled) + require.Error(t, err) + + st, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.ResourceExhausted, st.Code()) + + method, ok := keyValueByName(sink.record.kv, "grpc_method") + require.True(t, ok) + require.Equal(t, "/test.Service/Stream", method) +} + +func keyValueByName(kv []any, key string) (any, bool) { + for i := 0; i+1 < len(kv); i += 2 { + k, ok := kv[i].(string) + if ok && k == key { + return kv[i+1], true + } + } + + return nil, false +} diff --git a/service_impl.go b/service_impl.go index ff3f208..446e0eb 100644 --- a/service_impl.go +++ b/service_impl.go @@ -9,6 +9,8 @@ package memlimiter import ( "errors" "fmt" + "math" + "runtime/debug" "github.com/go-logr/logr" "github.com/newcloudtechnologies/memlimiter/backpressure" @@ -26,6 +28,8 @@ type serviceImpl struct { backpressureOperator backpressure.Operator statsSubscription stats.ServiceStatsSubscription controller controller.Controller + restoreGoMemoryLimit bool + oldGoMemoryLimit int64 logger logr.Logger } @@ -52,6 +56,11 @@ func (s *serviceImpl) Quit() { s.logger.Info("terminating MemLimiter service") s.controller.Quit() s.statsSubscription.Quit() + s.backpressureOperator.Quit() + + if s.restoreGoMemoryLimit { + debug.SetMemoryLimit(s.oldGoMemoryLimit) + } } // newServiceImpl - main entrypoint for MemLimiter. @@ -69,6 +78,20 @@ func newServiceImpl( return nil, errors.New("nil tracker subscription passed") } + var ( + restoreGoMemoryLimit bool + oldGoMemoryLimit int64 + ) + + if cfg.GoMemoryLimit.Value > 0 { + if cfg.GoMemoryLimit.Value > uint64(math.MaxInt64) { + return nil, errors.New("go memory limit exceeds int64 range") + } + + oldGoMemoryLimit = debug.SetMemoryLimit(int64(cfg.GoMemoryLimit.Value)) + restoreGoMemoryLimit = true + } + logger.Info("starting MemLimiter service") c, err := nextgc.NewControllerFromConfig( @@ -78,6 +101,10 @@ func newServiceImpl( backpressureOperator, ) if err != nil { + if restoreGoMemoryLimit { + debug.SetMemoryLimit(oldGoMemoryLimit) + } + return nil, fmt.Errorf("new controller from config: %w", err) } @@ -86,6 +113,8 @@ func newServiceImpl( backpressureOperator: backpressureOperator, statsSubscription: statsSubscription, controller: c, + restoreGoMemoryLimit: restoreGoMemoryLimit, + oldGoMemoryLimit: oldGoMemoryLimit, logger: logger, }, nil } diff --git a/service_impl_test.go b/service_impl_test.go new file mode 100644 index 0000000..5aebc35 --- /dev/null +++ b/service_impl_test.go @@ -0,0 +1,122 @@ +/* + * Copyright (c) New Cloud Technologies, Ltd. 2013-2022. + * Author: Vitaly Isaev + * License: https://github.com/newcloudtechnologies/memlimiter/blob/master/LICENSE + */ + +package memlimiter + +import ( + "runtime/debug" + "testing" + "time" + + "github.com/go-logr/logr/testr" + "github.com/newcloudtechnologies/memlimiter/backpressure" + "github.com/newcloudtechnologies/memlimiter/controller" + "github.com/newcloudtechnologies/memlimiter/controller/nextgc" + "github.com/newcloudtechnologies/memlimiter/stats" + "github.com/newcloudtechnologies/memlimiter/utils/config/bytes" + "github.com/newcloudtechnologies/memlimiter/utils/config/duration" + "github.com/stretchr/testify/require" +) + +var ( + _ controller.Controller = (*controllerStub)(nil) + _ stats.ServiceStatsSubscription = (*serviceStatsSubscriptionStub)(nil) + _ backpressure.Operator = (*backpressureOperatorStub)(nil) +) + +type controllerStub struct { + quitCalled bool +} + +type serviceStatsSubscriptionStub struct { + quitCalled bool +} + +type backpressureOperatorStub struct { + quitCalled bool +} + +func (c *controllerStub) GetStats() (*stats.ControllerStats, error) { + return &stats.ControllerStats{}, nil +} + +func (c *controllerStub) Quit() { c.quitCalled = true } + +func (s *serviceStatsSubscriptionStub) Updates() <-chan stats.ServiceStats { return nil } + +func (s *serviceStatsSubscriptionStub) Quit() { s.quitCalled = true } + +func (b *backpressureOperatorStub) SetControlParameters(_ *stats.ControlParameters) error { return nil } + +func (b *backpressureOperatorStub) AllowRequest() bool { return true } + +func (b *backpressureOperatorStub) GetStats() (*stats.BackpressureStats, error) { + return &stats.BackpressureStats{}, nil +} + +func (b *backpressureOperatorStub) Quit() { b.quitCalled = true } + +func TestServiceImplQuit(t *testing.T) { + logger := testr.New(t) + + c := &controllerStub{} + ss := &serviceStatsSubscriptionStub{} + bp := &backpressureOperatorStub{} + + s := &serviceImpl{ + controller: c, + statsSubscription: ss, + backpressureOperator: bp, + logger: logger, + } + + s.Quit() + + require.True(t, c.quitCalled) + require.True(t, ss.quitCalled) + require.True(t, bp.quitCalled) +} + +func TestNewServiceImplGoMemoryLimitLifecycle(t *testing.T) { + logger := testr.New(t) + + const ( + initialLimit int64 = 512 << 20 + configuredMem uint64 = 256 << 20 + ) + + previousBeforeTest := debug.SetMemoryLimit(initialLimit) + defer debug.SetMemoryLimit(previousBeforeTest) + + require.Equal(t, initialLimit, debug.SetMemoryLimit(-1)) + + cfg := &Config{ + GoMemoryLimit: bytes.Bytes{Value: configuredMem}, + ControllerNextGC: &nextgc.ControllerConfig{ + RSSLimit: bytes.Bytes{Value: 1 << 30}, + DangerZoneGOGC: 50, + DangerZoneThrottling: 90, + Period: duration.Duration{Duration: time.Hour}, + ComponentProportional: &nextgc.ComponentProportionalConfig{ + Coefficient: 1, + }, + }, + } + + service, err := newServiceImpl( + logger, + cfg, + &serviceStatsSubscriptionStub{}, + &backpressureOperatorStub{}, + ) + require.NoError(t, err) + + require.Equal(t, int64(configuredMem), debug.SetMemoryLimit(-1)) + + service.Quit() + + require.Equal(t, initialLimit, debug.SetMemoryLimit(-1)) +} diff --git a/test/allocator/analyze/compare.py b/test/allocator/analyze/compare.py index 78db468..3c4f755 100755 --- a/test/allocator/analyze/compare.py +++ b/test/allocator/analyze/compare.py @@ -25,10 +25,10 @@ class PerfConfigRenderer: __t: Final = ''' { "endpoint": "localhost:1988", - "rps": 100, + "rps": 120, "load_duration": "{{ load_duration }}", "allocation_size": "1M", - "pause_duration": "5s", + "pause_duration": "6s", "request_timeout": "1m" } ''' @@ -52,10 +52,12 @@ class ServerConfigRenderer: { {% if not unlimited %} "memlimiter": { + "go_memory_limit": "{{ go_memory_limit }}", "controller_nextgc": { "rss_limit": "{{ rss_limit }}", "danger_zone_gogc": 50, "danger_zone_throttling": 90, + "min_gogc": {{ min_gogc }}, "period": "100ms", "component_proportional": { "coefficient": {{ coefficient }}, @@ -85,6 +87,8 @@ def render(self, out = self.__template.render( unlimited=params.unlimited, rss_limit=params.rss_limit_str, + go_memory_limit=params.go_memory_limit_str, + min_gogc=params.min_gogc, coefficient=params.coefficient, ) @@ -196,6 +200,8 @@ def main(): render.control_params_subplots(reports, Path(root_dir, "control_params.png")) render.rss_pivot(reports, Path(root_dir, 'rss.png')) + render.gogc_floor_hits(reports, Path(root_dir, 'gogc_floor_hits.png')) + render.memory_limits_overlay(reports, Path(root_dir, 'memory_limits_overlay.png')) if __name__ == '__main__': diff --git a/test/allocator/analyze/render.py b/test/allocator/analyze/render.py index efbe06d..8362317 100755 --- a/test/allocator/analyze/render.py +++ b/test/allocator/analyze/render.py @@ -2,12 +2,14 @@ # Author: Vitaly Isaev # License: https://github.com/newcloudtechnologies/memlimiter/blob/master/LICENSE import os +import math from typing import List import humanize as humanize import matplotlib.pyplot as plt import matplotlib.ticker import numpy as np +import pandas as pd from report import Report @@ -17,63 +19,92 @@ def bytes_major_formatter(x, pos): return humanize.naturalsize(int(x), binary=True).replace(".0", "") -def control_params_subplots(reports: List[Report], path: os.PathLike): - ncols = 2 - nrows = 3 - if len(reports) != ncols * nrows: - raise Exception("columns and rows mismatch") +def _make_axes_grid(nplots: int, ncols: int = 2): + nrows = max(1, math.ceil(nplots / ncols)) + fig, axes = plt.subplots(ncols=ncols, nrows=nrows, figsize=(12, 5 * nrows)) + + # Normalize axes shape for both 1xN and Nx1 cases. + flat_axes = np.atleast_1d(axes).reshape(-1) + + for ax in flat_axes[nplots:]: + ax.set_visible(False) + + return fig, flat_axes + + +def _report_title(report: Report) -> str: + params = report.session.params + if params.unlimited: + if _terminated_early(report): + elapsed = _elapsed_duration_seconds(report) + return f"MemLimiter disabled (terminated at ~{elapsed:.0f}s)" + return "MemLimiter disabled" + + go_limit = params.go_memory_limit_str + return f"Cp={params.coefficient_str}, MinGOGC={params.min_gogc}, GoLimit={go_limit}" + + +def _expected_duration_seconds(report: Report) -> float: + return float(pd.to_timedelta(report.session.params.load_duration).total_seconds()) + + +def _elapsed_duration_seconds(report: Report) -> float: + if report.df.empty: + return 0.0 - fig, axes = plt.subplots(ncols=2, nrows=3, figsize=(12, 15)) + return float(report.df['elapsed_time'].max()) + + +def _terminated_early(report: Report, tolerance: float = 0.95) -> bool: + expected = _expected_duration_seconds(report) + if expected <= 0: + return False + + return _elapsed_duration_seconds(report) < expected * tolerance + + +def control_params_subplots(reports: List[Report], path: os.PathLike): + fig, axes = _make_axes_grid(len(reports), ncols=2) ls, labels = None, None - for i in range(nrows): - for j in range(ncols): - ix = i * ncols + j - - report = reports[ix] - df = report.df - ax = axes[i][j] - - ax.set_xlim(0, 60) - - ax.set_xlabel('Time, seconds') - - # RSS consumption plot. - color = 'tab:red' - l0 = ax.plot(df['elapsed_time'], df['rss'], color=color, label='RSS') - ax.set_ylabel('RSS, bytes') - ax.set_ylim(0, 1024 * 1024 * 1024) - ax.set_yticks([ml * 1024 * 1024 for ml in (256, 512, 512 + 256, 1024)]) - ax.yaxis.set_major_formatter(bytes_major_formatter) - - # GOGC consumption plot. - color = 'tab:blue' - twin1 = ax.twinx() - l1 = twin1.plot(df['elapsed_time'], df['gogc'], color=color, label='GOGC') - twin1.set_ylabel('GOGC') - twin1.set_ylim(-5, 105) - - # Throttling plot. - color = 'tab:green' - twin2 = ax.twinx() - twin2.spines.right.set_position(("axes", 1.2)) - l2 = twin2.plot(df['elapsed_time'], df['throttling'], color=color, label='Throttling') - twin2.set_ylabel('Throttling') - twin2.set_ylim(-5, 105) - - # Legend. - if not ls or not labels: - ls = l0 + l1 + l2 - labels = [l.get_label() for l in ls] - - # Title. - if report.session.params.unlimited: - title = "MemLimiter disabled" - else: - coefficient = report.session.params.coefficient_str - title = f'MemLimiter enabled, $C_{{p}} = {coefficient}$' - ax.title.set_text(title) + for ix, report in enumerate(reports): + df = report.df + ax = axes[ix] + + ax.set_xlim(0, 60) + + ax.set_xlabel('Time, seconds') + + # RSS consumption plot. + color = 'tab:red' + l0 = ax.plot(df['elapsed_time'], df['rss'], color=color, label='RSS') + ax.set_ylabel('RSS, bytes') + ax.set_ylim(0, 1024 * 1024 * 1024) + ax.set_yticks([ml * 1024 * 1024 for ml in (256, 512, 512 + 256, 1024)]) + ax.yaxis.set_major_formatter(bytes_major_formatter) + + # GOGC consumption plot. + color = 'tab:blue' + twin1 = ax.twinx() + l1 = twin1.plot(df['elapsed_time'], df['gogc'], color=color, label='GOGC') + twin1.set_ylabel('GOGC') + twin1.set_ylim(-5, 105) + + # Throttling plot. + color = 'tab:green' + twin2 = ax.twinx() + twin2.spines.right.set_position(("axes", 1.2)) + l2 = twin2.plot(df['elapsed_time'], df['throttling'], color=color, label='Throttling') + twin2.set_ylabel('Throttling') + twin2.set_ylim(-5, 105) + + # Legend. + if not ls or not labels: + ls = l0 + l1 + l2 + labels = [l.get_label() for l in ls] + + ax.title.set_text(_report_title(report)) fig.legend(ls, labels) fig.tight_layout() @@ -95,9 +126,16 @@ def rss_pivot(reports: List[Report], path: os.PathLike): for i in range(n): report = reports[n - 1 - i] if report.session.params.unlimited: - label = 'No limits' + if _terminated_early(report): + label = f'No limits (terminated at ~{_elapsed_duration_seconds(report):.0f}s)' + else: + label = 'No limits' else: - label = f'$C_{{p}} = {report.session.params.coefficient_str}$' + label = ( + f'$C_{{p}}={report.session.params.coefficient_str}$, ' + f'GML={report.session.params.go_memory_limit_str}, ' + f'MinGOGC={report.session.params.min_gogc}' + ) ax.plot(report.df['elapsed_time'], report.df['rss'], color=colors[i], label=label) @@ -105,3 +143,82 @@ def rss_pivot(reports: List[Report], path: os.PathLike): ax.title.set_text('RSS consumption dependence on $C_{{p}}$') fig.tight_layout() fig.savefig(path, transparent=False) + + +def gogc_floor_hits(reports: List[Report], path: os.PathLike): + active_reports = [report for report in reports if not report.session.params.unlimited] + if not active_reports: + raise Exception("no memlimiter-enabled reports") + + labels = [] + ratios = [] + for report in active_reports: + params = report.session.params + gogc_series = report.df['gogc'] + floor_hits = (gogc_series <= params.min_gogc).sum() + ratio = 100.0 * floor_hits / len(gogc_series) + + labels.append( + f"Cp={params.coefficient_str}\n" + f"Min={params.min_gogc}\n" + f"GML={params.go_memory_limit_str}" + ) + ratios.append(ratio) + + fig, ax = plt.subplots(figsize=(12, 6)) + bars = ax.bar(labels, ratios, color='tab:blue') + ax.set_ylim(0, 100) + ax.set_ylabel('Share of samples at GOGC floor, %') + ax.set_title('How often MinGOGC clamp is hit') + ax.grid(axis='y', alpha=0.3) + + for bar, value in zip(bars, ratios): + ax.text( + bar.get_x() + bar.get_width() / 2, + value + 1, + f"{value:.1f}%", + ha='center', + va='bottom', + ) + + fig.tight_layout() + fig.savefig(path, transparent=False) + + +def memory_limits_overlay(reports: List[Report], path: os.PathLike): + fig, axes = _make_axes_grid(len(reports), ncols=2) + + for ix, report in enumerate(reports): + params = report.session.params + df = report.df + ax = axes[ix] + + ax.set_xlim(0, 60) + ax.set_xlabel('Time, seconds') + ax.set_ylabel('Memory, bytes') + ax.yaxis.set_major_formatter(bytes_major_formatter) + + ax.plot(df['elapsed_time'], df['rss'], color='tab:red', label='RSS') + ax.plot(df['elapsed_time'], df['go_runtime_bytes'], color='tab:purple', label='Go runtime memory') + ax.axhline(params.rss_limit, color='black', linestyle='--', linewidth=1.2, label='RSS limit') + + if params.go_memory_limit > 0: + ax.axhline( + params.go_memory_limit, + color='tab:orange', + linestyle='--', + linewidth=1.2, + label='Go memory limit', + ) + + y_candidates = [params.rss_limit, df['rss'].max(), df['go_runtime_bytes'].max()] + if params.go_memory_limit > 0: + y_candidates.append(params.go_memory_limit) + ymax = max(y_candidates) * 1.1 + ax.set_ylim(0, ymax) + + ax.title.set_text(_report_title(report)) + ax.legend(loc='upper left') + + fig.tight_layout() + fig.savefig(path, transparent=False) diff --git a/test/allocator/analyze/report.py b/test/allocator/analyze/report.py index 476b4c3..9223cbc 100644 --- a/test/allocator/analyze/report.py +++ b/test/allocator/analyze/report.py @@ -27,20 +27,12 @@ def from_file(cls, path: os.PathLike, session: Session): def __parse_tracker_stats(path: os.PathLike) -> pd.DataFrame: df = pd.read_csv(path) df['timestamp'] = pd.to_datetime(df['timestamp']) + if 'go_runtime_bytes' not in df.columns: + df['go_runtime_bytes'] = 0 df['utilization'] *= 100 return df def __post_init__(self): - # Emulate OOM event for unconstrained process. - if self.session.params.unlimited: - last_ts, last_but_one_ts = self.df['timestamp'].iloc[-1], self.df['timestamp'].iloc[-2] - delta = last_ts - last_but_one_ts - self.df.loc[len(self.df)] = [ - last_ts + delta, - self.session.params.rss_limit, - 0, 0, 0, - ] - # Compute elapsed time. self.df['elapsed_time'] = (self.df['timestamp'] - self.df['timestamp'].min()).apply( lambda x: x.seconds + x.microseconds / 1000000) diff --git a/test/allocator/analyze/testing.py b/test/allocator/analyze/testing.py index f6bee36..cfbbbfc 100644 --- a/test/allocator/analyze/testing.py +++ b/test/allocator/analyze/testing.py @@ -7,22 +7,39 @@ from typing import Iterable, Final GIGABYTE: Final = 1024 * 1024 * 1024 +MEBIBYTE: Final = 1024 * 1024 +DEFAULT_GO_MEMORY_LIMIT: Final = 800 * MEBIBYTE +DEFAULT_MIN_GOGC: Final = 10 @dataclasses.dataclass class Params: unlimited: bool rss_limit: int = GIGABYTE + go_memory_limit: int = DEFAULT_GO_MEMORY_LIMIT + min_gogc: int = DEFAULT_MIN_GOGC coefficient: float = 20.0 load_duration: str = '60s' def __str__(self) -> str: - return f"unlimited_{self.unlimited}_rss_limit_{self.rss_limit}_coefficient_{self.coefficient_str}" + return ( + f"unlimited_{self.unlimited}" + f"_rss_limit_{self.rss_limit}" + f"_go_memory_limit_{self.go_memory_limit}" + f"_min_gogc_{self.min_gogc}" + f"_coefficient_{self.coefficient_str}" + ) @property def rss_limit_str(self): return f'{self.rss_limit}b' + @property + def go_memory_limit_str(self): + if self.go_memory_limit <= 0: + return "0" + return f'{self.go_memory_limit}b' + @property def coefficient_str(self): if type(self.coefficient) == float and self.coefficient.is_integer(): @@ -44,12 +61,47 @@ def __init__(self, case: Params, root_dir: os.PathLike): def make_sessions(root_dir: os.PathLike) -> Iterable[Session]: duration = "60s" cases = ( - Params(unlimited=True, load_duration=duration, rss_limit=GIGABYTE), - Params(unlimited=False, load_duration=duration, rss_limit=GIGABYTE, coefficient=0.5), - Params(unlimited=False, load_duration=duration, rss_limit=GIGABYTE, coefficient=1), - Params(unlimited=False, load_duration=duration, rss_limit=GIGABYTE, coefficient=5), - Params(unlimited=False, load_duration=duration, rss_limit=GIGABYTE, coefficient=10), - Params(unlimited=False, load_duration=duration, rss_limit=GIGABYTE, coefficient=50), + Params(unlimited=True, load_duration=duration, rss_limit=GIGABYTE, go_memory_limit=0), + Params( + unlimited=False, + load_duration=duration, + rss_limit=GIGABYTE, + go_memory_limit=0, + min_gogc=DEFAULT_MIN_GOGC, + coefficient=0.5, + ), + Params( + unlimited=False, + load_duration=duration, + rss_limit=GIGABYTE, + go_memory_limit=DEFAULT_GO_MEMORY_LIMIT, + min_gogc=DEFAULT_MIN_GOGC, + coefficient=0.5, + ), + Params( + unlimited=False, + load_duration=duration, + rss_limit=GIGABYTE, + go_memory_limit=DEFAULT_GO_MEMORY_LIMIT, + min_gogc=DEFAULT_MIN_GOGC, + coefficient=5, + ), + Params( + unlimited=False, + load_duration=duration, + rss_limit=GIGABYTE, + go_memory_limit=DEFAULT_GO_MEMORY_LIMIT, + min_gogc=DEFAULT_MIN_GOGC, + coefficient=10, + ), + Params( + unlimited=False, + load_duration=duration, + rss_limit=GIGABYTE, + go_memory_limit=DEFAULT_GO_MEMORY_LIMIT, + min_gogc=30, + coefficient=50, + ), ) # FIXME: Remove after debug. diff --git a/test/allocator/main.go b/test/allocator/main.go index 336b73e..e816d68 100644 --- a/test/allocator/main.go +++ b/test/allocator/main.go @@ -4,6 +4,7 @@ * License: https://github.com/newcloudtechnologies/memlimiter/blob/master/LICENSE */ +// Package main provides the allocator demo entrypoint. package main import ( diff --git a/test/allocator/tracker/report.go b/test/allocator/tracker/report.go index a5d5b99..c499941 100644 --- a/test/allocator/tracker/report.go +++ b/test/allocator/tracker/report.go @@ -13,17 +13,21 @@ import ( // Report is a memory consumption report (used only for tests). type Report struct { - Timestamp string - RSS uint64 - Utilization float64 - GOGC int - Throttling uint32 + Timestamp string + RSS uint64 + // GoRuntimeBytes is runtime-managed memory approximation: + // MemStats.Sys - MemStats.HeapReleased. + GoRuntimeBytes uint64 + Utilization float64 + GOGC int + Throttling uint32 } func (r *Report) headers() []string { return []string{ "timestamp", "rss", + "go_runtime_bytes", "utilization", "gogc", "throttling", @@ -34,6 +38,7 @@ func (r *Report) toCsv() []string { return []string{ r.Timestamp, strconv.FormatUint(r.RSS, 10), + strconv.FormatUint(r.GoRuntimeBytes, 10), fmt.Sprint(r.Utilization), strconv.Itoa(r.GOGC), strconv.FormatUint(uint64(r.Throttling), 10), diff --git a/test/allocator/tracker/tracker.go b/test/allocator/tracker/tracker.go index 342109d..43f6a88 100644 --- a/test/allocator/tracker/tracker.go +++ b/test/allocator/tracker/tracker.go @@ -9,6 +9,7 @@ package tracker import ( "errors" "fmt" + "runtime" "time" "github.com/go-logr/logr" @@ -79,6 +80,13 @@ func (tr *Tracker) makeReport() (*Report, error) { out.Timestamp = time.Now().Format(time.RFC3339Nano) + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + if memStats.Sys > memStats.HeapReleased { + out.GoRuntimeBytes = memStats.Sys - memStats.HeapReleased + } + mlStats, err := tr.memLimiter.GetStats() if err != nil { return nil, fmt.Errorf("memlimiter stats: %w", err) diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 35da5f2..c75de14 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -44,19 +44,19 @@ func TestComponent(t *testing.T) { } }() - // wait for a while to make server run asynchronously + // Wait for a while to make server run asynchronously. time.Sleep(time.Second) perfClient, err := makePerfClient(logger, endpoint) require.NoError(t, err) - // perform load + // Perform load. err = perfClient.Run() require.NoError(t, err) defer perfClient.Quit() - // collect reports + // Collect reports. reports, err := allocatorServer.Tracker().GetReports() require.NoError(t, err) require.NotEmpty(t, reports) @@ -114,7 +114,8 @@ func analyzeReports(t *testing.T, reports []*tracker.Report, rssLimit float64) { sample := &stats.Sample{} - // take only the second half of observations as we expect memory consumption to be stable here due to MemLimiter work + // Take only the second half of observations as we expect memory consumption to be stable here + // due to MemLimiter work. reports = reports[len(reports)/2:] for _, r := range reports { @@ -126,6 +127,6 @@ func analyzeReports(t *testing.T, reports []*tracker.Report, rssLimit float64) { // because of possible existence of a SWAP partition. actualRSS := sample.Mean() - // but since this is a soft limit, we allow small exceeding of it + // But since this is a soft limit, we allow small exceeding of it. require.Less(t, actualRSS, 1.10*rssLimit) }