diff --git a/cmd/neofs-node/cache.go b/cmd/neofs-node/cache.go index e1d7271c45..58bfc1a423 100644 --- a/cmd/neofs-node/cache.go +++ b/cmd/neofs-node/cache.go @@ -446,14 +446,30 @@ type ttlMaxObjectSizeCache struct { lastUpdated time.Time lastSize uint64 src putsvc.MaxSizeSource + onChange func(uint64) } -func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource { +func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource, onChange func(uint64)) *ttlMaxObjectSizeCache { return &ttlMaxObjectSizeCache{ - src: src, + src: src, + onChange: onChange, } } +func (c *ttlMaxObjectSizeCache) updateLastSize(sz uint64) { + if c.lastSize != sz { + c.onChange(sz) + } + c.lastSize = sz + c.lastUpdated = time.Now() +} + +func (c *ttlMaxObjectSizeCache) handleNewMaxObjectPayloadSize(sz uint64) { + c.mtx.Lock() + c.updateLastSize(sz) + c.mtx.Unlock() +} + func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 { const ttl = time.Second * 30 @@ -469,9 +485,9 @@ func (c *ttlMaxObjectSizeCache) MaxObjectSize() uint64 { c.mtx.Lock() size = c.lastSize if !c.lastUpdated.After(prevUpdated) { - size = c.src.MaxObjectSize() - c.lastSize = size - c.lastUpdated = time.Now() + newSize := c.src.MaxObjectSize() + c.updateLastSize(newSize) + size = newSize } c.mtx.Unlock() diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index e6de08ec90..1a6c3eac7b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -424,6 +424,8 @@ type cfgGRPC struct { maxChunkSize uint64 maxAddrAmount uint64 + + maxRecvMsgSize atomic.Value // int } type cfgMorph struct { diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index c40cdf7938..e50982f861 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -28,38 +28,27 @@ func initGRPC(c *cfg) { maxObjSize, err := c.nCli.MaxObjectSize() fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) - maxRecvSize := maxObjSize - // don't forget about meta fields - if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice - maxRecvSize += object.MaxHeaderLen - } else { - maxRecvSize = math.MaxUint64 - } - - var maxRecvMsgSizeOpt grpc.ServerOption - if maxRecvSize > maxMsgSize { // do not decrease default value - if maxRecvSize > math.MaxInt { - // ^2GB for 32-bit systems which is currently enough in practice. If at some - // point this is not enough, we'll need to expand the option - fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", - maxRecvSize, math.MaxInt)) - } - maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize)) - c.log.Debug("limit max recv gRPC message size to fit max stored objects", - zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize)) + maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjSize) + if maxRecvSize < 0 { + // ^2GB for 32-bit systems which is currently enough in practice. If at some + // point this is not enough, we'll need to expand the option + fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", + overflowed, math.MaxInt)) } + c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize) + // TODO(@cthulhu-rider): the setting can be server-global only now, support + // per-RPC limits + maxRecvMsgSizeOpt := grpc.MaxRecvMsgSizeFunc(func() int { + return c.cfgGRPC.maxRecvMsgSize.Load().(int) // initialized above, so safe + }) + c.log.Info("limit max recv gRPC message size to fit max stored objects", + zap.Uint64("max object size", maxObjSize), zap.Int("max recv msg", maxRecvSize)) var successCount int grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) { serverOpts := []grpc.ServerOption{ grpc.MaxSendMsgSize(maxMsgSize), - } - if maxRecvMsgSizeOpt != nil { - // TODO(@cthulhu-rider): the setting can be server-global only now, support - // per-RPC limits - // TODO(@cthulhu-rider): max object size setting may change in general, - // but server configuration is static now - serverOpts = append(serverOpts, maxRecvMsgSizeOpt) + maxRecvMsgSizeOpt, } tlsCfg := sc.TLS() @@ -153,3 +142,42 @@ func stopGRPC(name string, s *grpc.Server, l *zap.Logger) { l.Info("gRPC server stopped successfully") } + +// calculates approximation for max size of the ObjectService.Replicate request +// with given object payload limit. Second value is returned when calculation +// result overflows int type. In this case, first return is negative. +func calculateMaxReplicationRequestSize(maxObjPayloadSize uint64) (int, uint64) { + res := maxObjPayloadSize + // don't forget about meta fields: object header + other ObjectService.Replicate + // request fields. For the latter, less is needed now, but it is still better to + // take with a reserve for potential protocol extensions. Anyway, 1 KB is + // nothing IRL. + const maxMetadataSize = object.MaxHeaderLen + 1<<10 + if res < uint64(math.MaxUint64-maxMetadataSize) { // just in case, always true in practice + res += maxMetadataSize + } else { + res = math.MaxUint64 + } + if res > math.MaxInt { + return -1, res + } + if res < maxMsgSize { // do not decrease default value + return maxMsgSize, 0 + } + return int(res), 0 +} + +func (c *cfg) handleNewMaxObjectPayloadSize(maxObjPayloadSize uint64) { + maxRecvSize, overflowed := calculateMaxReplicationRequestSize(maxObjPayloadSize) + if maxRecvSize < 0 { + // unlike a startup, we don't want to stop a running service. Moreover, this is + // just a limit: even if it has become incredibly large, most data is expected + // to be of smaller degrees + c.log.Info("max gRPC recv msg size re-calculated for new max object payload size overflows int type, fallback to max int", + zap.Uint64("calculated limit", overflowed)) + maxRecvSize = math.MaxInt + } + c.cfgGRPC.maxRecvMsgSize.Store(maxRecvSize) + c.log.Info("updated max gRPC recv msg size limit after max object payload size has been changed", + zap.Uint64("new object limit", maxObjPayloadSize), zap.Int("new gRPC limit", maxRecvSize)) +} diff --git a/cmd/neofs-node/grpc_test.go b/cmd/neofs-node/grpc_test.go new file mode 100644 index 0000000000..1715e80e60 --- /dev/null +++ b/cmd/neofs-node/grpc_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func Test_calculateMaxReplicationRequestSize(t *testing.T) { + limit, _ := calculateMaxReplicationRequestSize(64 << 20) + require.EqualValues(t, 67126272, limit) + t.Run("int overflow", func(t *testing.T) { + limit, overflow := calculateMaxReplicationRequestSize(math.MaxInt - 17<<10 + 1) + require.Negative(t, limit) + require.EqualValues(t, uint64(math.MaxInt)+1, overflow) + }) + t.Run("uint64 overflow", func(t *testing.T) { + limit, overflow := calculateMaxReplicationRequestSize(math.MaxUint64 - 17<<10 + 1) + require.Negative(t, limit) + require.EqualValues(t, uint64(math.MaxUint64), overflow) + }) + t.Run("smaller than gRPC default", func(t *testing.T) { + limit, _ := calculateMaxReplicationRequestSize(0) + require.EqualValues(t, 4<<20, limit) + limit, _ = calculateMaxReplicationRequestSize(4<<20 - 17<<10 - 1) + require.EqualValues(t, 4<<20, limit) + }) +} + +func Test_cfg_handleNewMaxObjectPayloadSize(t *testing.T) { + var c cfg + c.log = zap.NewNop() + c.cfgGRPC.maxRecvMsgSize.Store(0) // any + + c.handleNewMaxObjectPayloadSize(100 << 20) + require.EqualValues(t, 100<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load()) + c.handleNewMaxObjectPayloadSize(64 << 20) + require.EqualValues(t, 64<<20+17<<10, c.cfgGRPC.maxRecvMsgSize.Load()) + // int overflow + c.handleNewMaxObjectPayloadSize(math.MaxInt - 17<<10 + 1) + require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load()) + // uint64 overflow + c.handleNewMaxObjectPayloadSize(math.MaxUint64 - 17<<10 + 1) + require.EqualValues(t, math.MaxInt, c.cfgGRPC.maxRecvMsgSize.Load()) + // smaller than gRPC default + c.handleNewMaxObjectPayloadSize(4<<20 - 17<<10 - 1) + require.EqualValues(t, 4<<20, c.cfgGRPC.maxRecvMsgSize.Load()) +} diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index 575b88e76f..2c1629f3cd 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -2,9 +2,11 @@ package main import ( "bytes" + "context" "errors" "fmt" "sync/atomic" + "time" netmapGRPC "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" @@ -445,3 +447,26 @@ func (n *netInfo) Dump(ver version.Version) (*netmapSDK.NetworkInfo, error) { return &ni, nil } + +func listenMaxObjectPayloadSizeChanges(ctx context.Context, cli *nmClient.Client, lg *zap.Logger, f func(uint64)) { + // config rarely changes, but when it does - we do not want to wait long. + // Notification events would help https://github.com/nspcc-dev/neofs-contract/issues/427 + const pollInterval = time.Minute + t := time.NewTimer(pollInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + lg.Debug("stop max object payload size net config poller by context", zap.Error(ctx.Err())) + return + case <-t.C: + lg.Info("rereading max object payload size net config by timer", zap.Duration("interval", pollInterval)) + if sz, err := cli.MaxObjectSize(); err == nil { + f(sz) + } else { + lg.Error("failed to read max object payload size net config", zap.Error(err)) + } + t.Reset(pollInterval) + } + } +} diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index fd77cae480..7850ef2109 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -256,10 +256,13 @@ func initObjectService(c *cfg) { searchsvcV2.WithKeyStorage(keyStorage), ) + cachedMaxObjPayloadSizeSrc := newCachedMaxObjectSizeSource(c, c.handleNewMaxObjectPayloadSize) + go listenMaxObjectPayloadSizeChanges(c.ctx, c.nCli, c.log, cachedMaxObjPayloadSizeSrc.handleNewMaxObjectPayloadSize) + sPut := putsvc.NewService(&transport{clients: putConstructor}, putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), - putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), + putsvc.WithMaxSizeSource(cachedMaxObjPayloadSizeSrc), putsvc.WithObjectStorage(storageEngine{engine: ls}), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.netMapSource), diff --git a/go.mod b/go.mod index cac4449df0..2301833872 100644 --- a/go.mod +++ b/go.mod @@ -105,3 +105,5 @@ retract ( v1.22.1 // Contains retraction only. v1.22.0 // Published accidentally. ) + +replace google.golang.org/grpc => github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657 diff --git a/go.sum b/go.sum index efc571159c..52e0c3515b 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657 h1:khwEiSUz2Pi0dPqIPittz466RG/gFyzvTHrZHDiTcWg= +github.com/cthulhu-rider/grpc-go v0.0.0-20240808123512-00d000d30657/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -273,8 +275,6 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3j google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= -google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= -google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=