From 6c9bef004075417d7f5a6f4ecd373dce91075874 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 28 Jan 2026 14:45:43 +0000 Subject: [PATCH] build(deps): bump github.com/nats-io/nats-server/v2 Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.12.3 to 2.12.4. - [Release notes](https://github.com/nats-io/nats-server/releases) - [Commits](https://github.com/nats-io/nats-server/compare/v2.12.3...v2.12.4) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-version: 2.12.4 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 6 +- go.sum | 12 +- .../nats-io/nats-server/v2/conf/parse.go | 61 ++- .../nats-io/nats-server/v2/server/accounts.go | 1 + .../v2/server/certidp/ocsp_responder.go | 2 +- .../nats-io/nats-server/v2/server/client.go | 63 ++- .../nats-io/nats-server/v2/server/const.go | 2 +- .../nats-io/nats-server/v2/server/consumer.go | 76 ++-- .../nats-server/v2/server/filestore.go | 371 +++++++++++++----- .../nats-io/nats-server/v2/server/gsl/gsl.go | 126 ++---- .../nats-server/v2/server/jetstream.go | 13 +- .../nats-server/v2/server/jetstream_api.go | 283 ++++++------- .../v2/server/jetstream_cluster.go | 14 +- .../nats-io/nats-server/v2/server/memstore.go | 63 ++- .../nats-io/nats-server/v2/server/monitor.go | 84 ++-- .../nats-io/nats-server/v2/server/opts.go | 2 +- .../nats-io/nats-server/v2/server/raft.go | 29 +- .../nats-io/nats-server/v2/server/stream.go | 87 ++-- .../nats-server/v2/server/stree/stree.go | 60 +++ .../nats-io/nats-server/v2/server/sublist.go | 66 +--- .../nats-io/nats-server/v2/server/util.go | 2 +- vendor/modules.txt | 6 +- 22 files changed, 886 insertions(+), 543 deletions(-) diff --git a/go.mod b/go.mod index 3b0fdeaa28..5326a6b3df 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.12.3 + github.com/nats-io/nats-server/v2 v2.12.4 github.com/nats-io/nats.go v1.48.0 github.com/oklog/run v1.2.0 github.com/olekukonko/tablewriter v1.1.3 @@ -241,7 +241,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/gomodule/redigo v1.9.3 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/google/go-tpm v0.9.7 // indirect + github.com/google/go-tpm v0.9.8 // indirect github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect github.com/google/renameio/v2 v2.0.1 // indirect github.com/gookit/goutil v0.7.1 // indirect @@ -261,7 +261,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/juliangruber/go-intersect v1.1.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.18.2 // indirect + github.com/klauspost/compress v1.18.3 // indirect github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/klauspost/crc32 v1.3.0 // indirect github.com/kovidgoyal/go-parallel v1.1.1 // indirect diff --git a/go.sum b/go.sum index 6f6eb39b56..8801a1a858 100644 --- a/go.sum +++ b/go.sum @@ -575,8 +575,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/go-tika v0.3.1 h1:l+jr10hDhZjcgxFRfcQChRLo1bPXQeLFluMyvDhXTTA= github.com/google/go-tika v0.3.1/go.mod h1:DJh5N8qxXIl85QkqmXknd+PeeRkUOTbvwyYf7ieDz6c= -github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA= -github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/go-tpm v0.9.8 h1:slArAR9Ft+1ybZu0lBwpSmpwhRXaa85hWtMinMyRAWo= +github.com/google/go-tpm v0.9.8/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -730,8 +730,8 @@ github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= -github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= +github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= @@ -916,8 +916,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs= -github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y= +github.com/nats-io/nats-server/v2 v2.12.4 h1:ZnT10v2LU2Xcoiy8ek9X6Se4YG8EuMfIfvAEuFVx1Ts= +github.com/nats-io/nats-server/v2 v2.12.4/go.mod h1:5MCp/pqm5SEfsvVZ31ll1088ZTwEUdvRX1Hmh/mTTDg= github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= diff --git a/vendor/github.com/nats-io/nats-server/v2/conf/parse.go b/vendor/github.com/nats-io/nats-server/v2/conf/parse.go index 3e52c7d228..b94c539f06 100644 --- a/vendor/github.com/nats-io/nats-server/v2/conf/parse.go +++ b/vendor/github.com/nats-io/nats-server/v2/conf/parse.go @@ -60,6 +60,9 @@ type parser struct { // pedantic reports error when configuration is not correct. pedantic bool + + // Tracks environment variable references, to avoid cycles + envVarReferences map[string]bool } // Parse will return a map of keys to any, although concrete types @@ -180,16 +183,37 @@ func (t *token) Position() int { return t.item.pos } -func parse(data, fp string, pedantic bool) (p *parser, err error) { - p = &parser{ - mapping: make(map[string]any), - lx: lex(data), - ctxs: make([]any, 0, 4), - keys: make([]string, 0, 4), - ikeys: make([]item, 0, 4), - fp: filepath.Dir(fp), - pedantic: pedantic, +func newParser(data, fp string, pedantic bool) *parser { + return &parser{ + mapping: make(map[string]any), + lx: lex(data), + ctxs: make([]any, 0, 4), + keys: make([]string, 0, 4), + ikeys: make([]item, 0, 4), + fp: filepath.Dir(fp), + pedantic: pedantic, + envVarReferences: make(map[string]bool), + } +} + +func parse(data, fp string, pedantic bool) (*parser, error) { + p := newParser(data, fp, pedantic) + if err := p.parse(fp); err != nil { + return nil, err } + return p, nil +} + +func parseEnv(data string, parent *parser) (*parser, error) { + p := newParser(data, "", false) + p.envVarReferences = parent.envVarReferences + if err := p.parse(""); err != nil { + return nil, err + } + return p, nil +} + +func (p *parser) parse(fp string) error { p.pushContext(p.mapping) var prevItem item @@ -199,16 +223,16 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) { // Here we allow the final character to be a bracket '}' // in order to support JSON like configurations. if prevItem.typ == itemKey && prevItem.val != mapEndString { - return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos) + return fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos) } break } prevItem = it if err := p.processItem(it, fp); err != nil { - return nil, err + return err } } - return p, nil + return nil } func (p *parser) next() item { @@ -453,11 +477,18 @@ func (p *parser) lookupVariable(varReference string) (any, bool, error) { } // If we are here, we have exhausted our context maps and still not found anything. - // Parse from the environment. + // Detect reference cycles + if p.envVarReferences[varReference] { + return nil, false, fmt.Errorf("variable reference cycle for '%s'", varReference) + } + p.envVarReferences[varReference] = true + defer delete(p.envVarReferences, varReference) + + // Parse from the environment if vStr, ok := os.LookupEnv(varReference); ok { // Everything we get here will be a string value, so we need to process as a parser would. - if vmap, err := Parse(fmt.Sprintf("%s=%s", pkey, vStr)); err == nil { - v, ok := vmap[pkey] + if subp, err := parseEnv(fmt.Sprintf("%s=%s", pkey, vStr), p); err == nil { + v, ok := subp.mapping[pkey] return v, ok, nil } else { return nil, false, err diff --git a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go index 548f6943ec..44f52d3a7d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/accounts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/accounts.go @@ -299,6 +299,7 @@ func (a *Account) shallowCopy(na *Account) { na.Nkey = a.Nkey na.Issuer = a.Issuer na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling + na.nrgAccount = a.nrgAccount if a.imports.streams != nil { na.imports.streams = make([]*streamImport, 0, len(a.imports.streams)) diff --git a/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go b/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go index ea2b614ed6..e560d6e6a5 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/certidp/ocsp_responder.go @@ -1,4 +1,4 @@ -// Copyright 2023-2024 The NATS Authors +// Copyright 2023-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index 135df35827..0e4aa4f18c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -1,4 +1,4 @@ -// Copyright 2012-2025 The NATS Authors +// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -23,6 +23,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "net" "net/http" @@ -35,8 +36,6 @@ import ( "sync/atomic" "time" - "slices" - "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/internal/fastrand" @@ -2732,9 +2731,12 @@ func (c *client) updateS2AutoCompressionLevel(co *CompressionOpts, compression * } // Will return the parts from the raw wire msg. +// We return the `hdr` as a slice that is capped to the length of the headers +// so that if the caller later tries to append to the returned header slice it +// does not affect the message content. func (c *client) msgParts(data []byte) (hdr []byte, msg []byte) { if c != nil && c.pa.hdr > 0 { - return data[:c.pa.hdr], data[c.pa.hdr:] + return data[:c.pa.hdr:c.pa.hdr], data[c.pa.hdr:] } return nil, data } @@ -3337,7 +3339,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool sub.shadow = nil if len(shadowSubs) > 0 { isSpokeLeaf = c.isSpokeLeafNode() - updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil + updateRoute = !isSpokeLeaf && (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF || c.kind == JETSTREAM) && c.srv != nil } sub.close() c.mu.Unlock() @@ -4565,6 +4567,19 @@ func getHeaderKeyIndex(key string, hdr []byte) int { } } +// setHeader will replace the value of the first existing key `key` +// with the given value `val`, or add this new key at the end of +// the headers. +// +// Note: If the key does not exist, or if it exists but the new value +// would make the resulting byte slice larger than the original one, +// a new byte slice is returned and the original is left untouched. +// This is to prevent situations where caller may have a `hdr` and +// `msg` that are the parts of an underlying buffer. Extending the +// `hdr` would otherwise overwrite the `msg` part. +// +// If the new value is smaller, then the original `hdr` byte slice +// is modified. func setHeader(key, val string, hdr []byte) []byte { start := getHeaderKeyIndex(key, hdr) if start >= 0 { @@ -4579,15 +4594,45 @@ func setHeader(key, val string, hdr []byte) []byte { return hdr // malformed headers } valEnd += valStart - suffix := slices.Clone(hdr[valEnd:]) - newHdr := append(hdr[:valStart], val...) - return append(newHdr, suffix...) + // Length of the existing value (before the `\r`) + oldValLen := valEnd - valStart + // This is how many extra bytes we need for the new value. + // If <= 0, it means that we need less and so will reuse the `hdr` buffer. + if extra := len(val) - oldValLen; extra > 0 { + // Check that we don't overflow an "int". + if rem := math.MaxInt - hdrLen; rem < extra { + // We don't grow, and return the existing header. + return hdr + } + // The new size is the old size plus the extra bytes. + newHdrSize := hdrLen + extra + newHdr := make([]byte, newHdrSize) + // Copy the parts from `hdr` and `val` into the new buffer. + n := copy(newHdr, hdr[:valStart]) + n += copy(newHdr[n:], val) + copy(newHdr[n:], hdr[valEnd:]) + return newHdr + } + // We can write in place since it fits in the existing `hdr` buffer. + n := copy(hdr[valStart:], val) + n += copy(hdr[valStart+n:], hdr[valEnd:]) + hdr = hdr[:valStart+n] + return hdr } if len(hdr) > 0 && bytes.HasSuffix(hdr, []byte("\r\n")) { hdr = hdr[:len(hdr)-2] val += "\r\n" } - return fmt.Appendf(hdr, "%s: %s\r\n", key, val) + // Create the new buffer based on length of existing one and + // length of the new ": \r\n". Protect against "int" overflow. + newSize := uint64(len(hdr)) + uint64(len(key)) + 1 + 1 + uint64(len(val)) + 2 + if newSize > uint64(math.MaxInt) { + // We don't grow, and return the existing header. + return hdr + } + newHdr := make([]byte, 0, int(newSize)) + newHdr = append(newHdr, hdr...) + return fmt.Appendf(newHdr, "%s: %s\r\n", key, val) } // For bytes.HasPrefix below. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index a4a72cc988..0428275fc5 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -66,7 +66,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.12.3" + VERSION = "2.12.4" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index b1da902903..668aad917e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1030,11 +1030,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } if cName != _EMPTY_ { if eo, ok := mset.consumers[cName]; ok { - mset.mu.Unlock() if action == ActionCreate { ocfg := eo.config() copyConsumerMetadata(config, &ocfg) if !reflect.DeepEqual(config, &ocfg) { + mset.mu.Unlock() return nil, NewJSConsumerAlreadyExistsError() } } @@ -1042,9 +1042,11 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if cfg.Retention == WorkQueuePolicy { subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if !mset.partitionUnique(cName, subjects) { + mset.mu.Unlock() return nil, NewJSConsumerWQConsumerNotUniqueError() } } + mset.mu.Unlock() err := eo.updateConfig(config) if err == nil { return eo, nil @@ -1542,7 +1544,6 @@ func (o *consumer) setLeader(isLeader bool) { if o.cfg.AckPolicy != AckNone { if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil { o.mu.Unlock() - o.deleteWithoutAdvisory() return } } @@ -1551,7 +1552,6 @@ func (o *consumer) setLeader(isLeader bool) { // Will error if wrong mode to provide feedback to users. if o.reqSub, err = o.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil { o.mu.Unlock() - o.deleteWithoutAdvisory() return } @@ -1561,7 +1561,6 @@ func (o *consumer) setLeader(isLeader bool) { fcsubj := fmt.Sprintf(jsFlowControl, stream, o.name) if o.fcSub, err = o.subscribeInternal(fcsubj, o.processFlowControl); err != nil { o.mu.Unlock() - o.deleteWithoutAdvisory() return } } @@ -2401,7 +2400,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { // Check for Subject Filters update. newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects) - if !subjectSliceEqual(newSubjects, o.subjf.subjects()) { + updatedFilters := !subjectSliceEqual(newSubjects, o.subjf.subjects()) + if updatedFilters { newSubjf := make(subjectFilters, 0, len(newSubjects)) for _, newFilter := range newSubjects { fs := &subjectFilter{ @@ -2440,15 +2440,17 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { // Allowed but considered no-op, [Description, SampleFrequency, MaxWaiting, HeadersOnly] o.cfg = *cfg - // Cleanup messages that lost interest. - if o.retention == InterestPolicy { - o.mu.Unlock() - o.cleanupNoInterestMessages(o.mset, false) - o.mu.Lock() - } + if updatedFilters { + // Cleanup messages that lost interest. + if o.retention == InterestPolicy { + o.mu.Unlock() + o.cleanupNoInterestMessages(o.mset, false) + o.mu.Lock() + } - // Re-calculate num pending on update. - o.streamNumPending() + // Re-calculate num pending on update. + o.streamNumPending() + } return nil } @@ -5115,9 +5117,14 @@ func (o *consumer) checkNumPending() (uint64, error) { var state StreamState o.mset.store.FastState(&state) npc := o.numPending() - if o.sseq > state.LastSeq && npc > 0 || npc > state.Msgs { - // Re-calculate. - return o.streamNumPending() + // Make sure we can't report more messages than there are. + // TODO(nat): It's not great that this means consumer info has side effects, + // since we can't know whether anyone will call it or not. The previous num + // pending calculation that this replaces had the same problem though. + if o.sseq > state.LastSeq { + o.npc = 0 + } else if npc > 0 { + o.npc = int64(min(npc, state.Msgs, state.LastSeq-o.sseq+1)) } } return o.numPending(), nil @@ -5365,6 +5372,15 @@ func (o *consumer) trackPending(sseq, dseq uint64) { o.pending = make(map[uint64]*Pending) } + now := time.Now() + if p, ok := o.pending[sseq]; ok { + // Update timestamp but keep original consumer delivery sequence. + // So do not update p.Sequence. + p.Timestamp = now.UnixNano() + } else { + o.pending[sseq] = &Pending{dseq, now.UnixNano()} + } + // We could have a backoff that set a timer higher than what we need for this message. // In that case, reset to lowest backoff required for a message redelivery. minDelay := o.ackWait(0) @@ -5377,18 +5393,10 @@ func (o *consumer) trackPending(sseq, dseq uint64) { } minDelay = o.ackWait(o.cfg.BackOff[bi]) } - minDeadline := time.Now().Add(minDelay) + minDeadline := now.Add(minDelay) if o.ptmr == nil || o.ptmrEnd.After(minDeadline) { o.resetPtmr(minDelay) } - - if p, ok := o.pending[sseq]; ok { - // Update timestamp but keep original consumer delivery sequence. - // So do not update p.Sequence. - p.Timestamp = time.Now().UnixNano() - } else { - o.pending[sseq] = &Pending{dseq, time.Now().UnixNano()} - } } // Credit back a failed delivery. @@ -6503,6 +6511,10 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { retryAsflr = seq } } else if seq <= dflr { + // Store the first entry above our ack floor, so we don't need to look it up again on retryAsflr=0. + if retryAsflr == 0 { + retryAsflr = seq + } // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. if _, ok := state.Pending[seq]; !ok { // The filters are already taken into account, @@ -6514,8 +6526,18 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } } // If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it. + // However, our ack floor may be lower than the next message we can receive, so we correct it upward if needed. if retryAsflr == 0 { - retryAsflr = asflr + 1 + if filters != nil { + _, nseq, err = store.LoadNextMsgMulti(filters, asflr+1, &smv) + } else { + _, nseq, err = store.LoadNextMsg(filter, wc, asflr+1, &smv) + } + if err == nil { + retryAsflr = max(asflr+1, nseq) + } else if err == ErrStoreEOF { + retryAsflr = ss.LastSeq + 1 + } } o.mu.Lock() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 1f34ff958c..e3f9e855fe 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -201,7 +201,7 @@ type fileStore struct { sips int dirty int closing bool - closed bool + closed atomic.Bool // Atomic to reduce contention on ConsumerStores. fip bool receivedAny bool firstMoved bool @@ -473,10 +473,18 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey) + _, err = os.Stat(keyFile) + // Either the file should exist (err=nil), or it shouldn't. Any other error is reported. + if err != nil && !os.IsNotExist(err) { + return nil, err + } // Make sure we do not have an encrypted store underneath of us but no main key. - if fs.prf == nil { - if _, err := os.Stat(keyFile); err == nil { - return nil, errNoMainKey + if fs.prf == nil && err == nil { + return nil, errNoMainKey + } else if fs.prf != nil && err == nil { + // If encryption is configured and the key file exists, recover our keys. + if err = fs.recoverAEK(); err != nil { + return nil, err } } @@ -1784,16 +1792,14 @@ func (fs *fileStore) recoverFullState() (rerr error) { } // Decrypt if needed. - if fs.prf != nil { - // We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile - // since snapshots strip encryption. - if err := fs.recoverAEK(); err == nil { - ns := fs.aek.NonceSize() - buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil) - if err != nil { - fs.warn("Stream state error reading encryption key: %v", err) - return err - } + // We can be setup for encryption but if this is a snapshot restore we will be missing the keyfile + // since snapshots strip encryption. + if fs.prf != nil && fs.aek != nil { + ns := fs.aek.NonceSize() + buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil) + if err != nil { + fs.warn("Stream state error reading encryption key: %v", err) + return err } } @@ -2346,7 +2352,7 @@ func (fs *fileStore) recoverMsgs() error { if fs.ld != nil { var emptyBlks []*msgBlock for _, mb := range fs.blks { - if mb.msgs == 0 && mb.rbytes == 0 { + if mb.msgs == 0 && mb.rbytes == 0 && mb != fs.lmb { emptyBlks = append(emptyBlks, mb) } } @@ -2588,7 +2594,7 @@ func copyMsgBlocks(src []*msgBlock) []*msgBlock { func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { fs.mu.RLock() lastSeq := fs.state.LastSeq - closed := fs.closed + closed := fs.isClosed() fs.mu.RUnlock() if closed { @@ -2603,20 +2609,51 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 { fseq := atomic.LoadUint64(&mb.first.seq) lseq := atomic.LoadUint64(&mb.last.seq) - var smv StoreMsg + var ( + smv StoreMsg + cts int64 + cseq uint64 + off uint64 + ) ts := t.UnixNano() - // Because sort.Search expects range [0,off), we have to manually - // calculate the offset from the first sequence. - off := int(lseq - fseq + 1) - i := sort.Search(off, func(i int) bool { - sm, _, _ := mb.fetchMsgNoCopy(fseq+uint64(i), &smv) - return sm != nil && sm.ts >= ts - }) - if i < off { - return fseq + uint64(i) + // Using a binary search, but need to be aware of interior deletes in the block. + seq := lseq + 1 +loop: + for fseq <= lseq { + mid := fseq + (lseq-fseq)/2 + off = 0 + // Potentially skip over gaps. We keep the original middle but keep track of a + // potential delete range with an offset. + for { + sm, _, err := mb.fetchMsgNoCopy(mid+off, &smv) + if err != nil || sm == nil { + off++ + if mid+off <= lseq { + continue + } else { + // Continue search to the left. Purposely ignore the skipped deletes here. + lseq = mid - 1 + continue loop + } + } + cts = sm.ts + cseq = sm.seq + break + } + if cts >= ts { + seq = cseq + if mid == fseq { + break + } + // Continue search to the left. + lseq = mid - 1 + } else { + // Continue search to the right (potentially skipping over interior deletes). + fseq = mid + off + 1 + } } - return 0 + return seq } // Find the first matching message against a sublist. @@ -2632,13 +2669,15 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * mb.mu.Unlock() }() - // Need messages loaded from here on out. - if mb.cacheNotLoaded() { + if mb.fssNotLoaded() { + // Make sure we have fss loaded. if err := mb.loadMsgsWithLock(); err != nil { return nil, false, err } didLoad = true } + // Mark fss activity. + mb.lsts = ats.AccessTime() // Make sure to start at mb.first.seq if fseq < mb.first.seq if seq := atomic.LoadUint64(&mb.first.seq); seq > start { @@ -2646,10 +2685,6 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * } lseq := atomic.LoadUint64(&mb.last.seq) - if sm == nil { - sm = new(StoreMsg) - } - // If the FSS state has fewer entries than sequences in the linear scan, // then use intersection instead as likely going to be cheaper. This will // often be the case with high numbers of deletes, as well as a smaller @@ -2657,7 +2692,11 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * if uint64(mb.fss.Size()) < lseq-start { // If there are no subject matches then this is effectively no-op. hseq := uint64(math.MaxUint64) - gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) { + var ierr error + stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) { + if ierr != nil { + return + } if ss.firstNeedsUpdate || ss.lastNeedsUpdate { // mb is already loaded into the cache so should be fast-ish. mb.recalculateForSubj(bytesToString(subj), ss) @@ -2669,6 +2708,16 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * // than our first seq for this subject. return } + // Need messages loaded from here on out. + if mb.cacheNotLoaded() { + if ierr = mb.loadMsgsWithLock(); ierr != nil { + return + } + didLoad = true + } + if sm == nil { + sm = new(StoreMsg) + } if first == ss.First { // If the start floor is below where this subject starts then we can // short-circuit, avoiding needing to scan for the next message. @@ -2703,10 +2752,24 @@ func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm * mb.llseq = llseq } }) + if ierr != nil { + return nil, false, ierr + } if hseq < uint64(math.MaxUint64) && sm != nil { return sm, didLoad && start == lseq, nil } } else { + // Need messages loaded from here on out. + if mb.cacheNotLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return nil, false, err + } + didLoad = true + } + if sm == nil { + sm = new(StoreMsg) + } + for seq := start; seq <= lseq; seq++ { if mb.dmap.Exists(seq) { // Optimisation to avoid calling cacheLookup which hits time.Now(). @@ -2773,6 +2836,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor isAll = true }) } + // If the only subject in this block isn't our filter, can simply short-circuit. + if !isAll { + return nil, didLoad, ErrStoreMsgNotFound + } } // Make sure to start at mb.first.seq if fseq < mb.first.seq fseq = max(fseq, atomic.LoadUint64(&mb.first.seq)) @@ -2909,7 +2976,7 @@ func (mb *msgBlock) prevMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *S if uint64(mb.fss.Size()) < start-lseq { // If there are no subject matches then this is effectively no-op. hseq := uint64(0) - gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) { + stree.IntersectGSL(mb.fss, sl, func(subj []byte, ss *SimpleState) { if ss.firstNeedsUpdate || ss.lastNeedsUpdate { // mb is already loaded into the cache so should be fast-ish. mb.recalculateForSubj(bytesToString(subj), ss) @@ -3188,6 +3255,30 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool, bi int) (int, e if start == uint32(math.MaxUint32) { return -1, ErrStoreEOF } + return fs.selectSkipFirstBlock(bi, start, stop) +} + +// This is used to see if we can selectively jump start blocks based on filter subjects and a starting block index. +// Will return -1 and ErrStoreEOF if no matches at all or no more from where we are. +func (fs *fileStore) checkSkipFirstBlockMulti(sl *gsl.SimpleSublist, bi int) (int, error) { + // Move through psim to gather start and stop bounds. + start, stop := uint32(math.MaxUint32), uint32(0) + stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) { + if psi.fblk < start { + start = psi.fblk + } + if psi.lblk > stop { + stop = psi.lblk + } + }) + // Nothing was found. + if start == uint32(math.MaxUint32) { + return -1, ErrStoreEOF + } + return fs.selectSkipFirstBlock(bi, start, stop) +} + +func (fs *fileStore) selectSkipFirstBlock(bi int, start, stop uint32) (int, error) { // Can not be nil so ok to inline dereference. mbi := fs.blks[bi].getIndex() // All matching msgs are behind us. @@ -4006,7 +4097,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer mb := fs.blks[seqStart] bi := mb.index - gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) { + stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) { // If the select blk start is greater than entry's last blk skip. if bi > psi.lblk { return @@ -4105,7 +4196,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer var t uint64 var havePartial bool var updateLLTS bool - gsl.IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) { + stree.IntersectGSL[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) { subj := bytesToString(bsubj) if havePartial { // If we already found a partial then don't do anything else. @@ -4168,7 +4259,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer // If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks. start := uint32(math.MaxUint32) - gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) { + stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) { total += psi.total // Keep track of start index for this subject. if psi.fblk < start { @@ -4219,7 +4310,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPer } // Mark fss activity. mb.lsts = ats.AccessTime() - gsl.IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) { + stree.IntersectGSL(mb.fss, sl, func(bsubj []byte, ss *SimpleState) { adjust += ss.Msgs }) } @@ -4486,7 +4577,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error { // Stores a raw message with expected sequence number and timestamp. // Lock should be held. func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, ttl int64) (err error) { - if fs.closed { + if fs.isClosed() { return ErrStoreClosed } @@ -5159,7 +5250,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( fsLock() - if fs.closed { + if fs.isClosed() { fsUnlock() return false, ErrStoreClosed } @@ -5369,14 +5460,8 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc. fs.mu.Unlock() // Storage updates. - if cb != nil { - var subj string - if sm != nil { - subj = sm.subj - } - delta := int64(msz) - cb(-1, -delta, seq, subj) - } + delta := int64(msz) + cb(-1, -delta, seq, sm.subj) if !needFSLock { fs.mu.Lock() @@ -5611,10 +5696,7 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { } func (fs *fileStore) isClosed() bool { - fs.mu.RLock() - closed := fs.closed - fs.mu.RUnlock() - return closed + return fs.closed.Load() } // Will spin up our flush loop. @@ -6995,11 +7077,10 @@ func (mb *msgBlock) ensureRawBytesLoaded() error { // Sync msg and index files as needed. This is called from a timer. func (fs *fileStore) syncBlocks() { - fs.mu.Lock() - if fs.closed { - fs.mu.Unlock() + if fs.isClosed() { return } + fs.mu.Lock() blks := append([]*msgBlock(nil), fs.blks...) lmb, firstMoved, firstSeq := fs.lmb, fs.firstMoved, fs.state.FirstSeq // Clear first moved. @@ -7089,11 +7170,10 @@ func (fs *fileStore) syncBlocks() { } } - fs.mu.Lock() - if fs.closed { - fs.mu.Unlock() + if fs.isClosed() { return } + fs.mu.Lock() fs.setSyncTimer() if markDirty { fs.dirty++ @@ -7939,17 +8019,14 @@ func (fs *fileStore) msgForSeq(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // Will return message for the given sequence number. func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) (*StoreMsg, error) { + if fs.isClosed() { + return nil, ErrStoreClosed + } // TODO(dlc) - Since Store, Remove, Skip all hold the write lock on fs this will // be stalled. Need another lock if want to happen in parallel. if needFSLock { fs.mu.RLock() } - if fs.closed { - if needFSLock { - fs.mu.RUnlock() - } - return nil, ErrStoreClosed - } // Indicates we want first msg. if seq == 0 { seq = fs.state.FirstSeq @@ -8128,10 +8205,14 @@ func (fs *fileStore) LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error) { // loadLast will load the last message for a subject. Subject should be non empty and not ">". func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err error) { + if fs.isClosed() { + return nil, ErrStoreClosed + } + fs.mu.RLock() defer fs.mu.RUnlock() - if fs.closed || fs.lmb == nil { + if fs.lmb == nil { return nil, ErrStoreClosed } @@ -8228,15 +8309,15 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e // LoadNextMsgMulti will find the next message matching any entry in the sublist. func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) { + if fs.isClosed() { + return nil, 0, ErrStoreClosed + } if sl == nil { return fs.LoadNextMsg(_EMPTY_, false, start, smp) } fs.mu.RLock() defer fs.mu.RUnlock() - if fs.closed { - return nil, 0, ErrStoreClosed - } if fs.state.Msgs == 0 || start > fs.state.LastSeq { return nil, fs.state.LastSeq, ErrStoreEOF } @@ -8244,6 +8325,31 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp * start = fs.state.FirstSeq } + // If start is less than or equal to beginning of our stream, meaning our first call, + // let's check the psim to see if we can skip ahead. + if start <= fs.state.FirstSeq { + var total uint64 + blkStart := uint32(math.MaxUint32) + stree.IntersectGSL(fs.psim, sl, func(subj []byte, psi *psi) { + total += psi.total + // Keep track of start index for this subject. + if psi.fblk < blkStart { + blkStart = psi.fblk + } + }) + // Nothing available. + if total == 0 { + return nil, fs.state.LastSeq, ErrStoreEOF + } + // We can skip ahead. + if mb := fs.bim[blkStart]; mb != nil { + fseq := atomic.LoadUint64(&mb.first.seq) + if fseq > start { + start = fseq + } + } + } + if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 { for i := bi; i < len(fs.blks); i++ { mb := fs.blks[i] @@ -8254,8 +8360,28 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp * return sm, sm.seq, nil } else if err != ErrStoreMsgNotFound { return nil, 0, err - } else if expireOk { - mb.tryForceExpireCache() + } else { + // Nothing found in this block. We missed, if first block (bi) check psim. + // Similar to above if start <= first seq. + // TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers. + // We should not do this at all if we are already on the last block. + if i == bi && i < len(fs.blks)-1 { + nbi, err := fs.checkSkipFirstBlockMulti(sl, bi) + // Nothing available. + if err == ErrStoreEOF { + return nil, fs.state.LastSeq, ErrStoreEOF + } + // See if we can jump ahead here. + // Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded. + // For v2 will track all blocks that have matches for psim. + if nbi > i { + i = nbi - 1 // For the iterator condition i++ + } + } + // Check if we can expire. + if expireOk { + mb.tryForceExpireCache() + } } } } @@ -8265,12 +8391,13 @@ func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp * } func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, uint64, error) { + if fs.isClosed() { + return nil, 0, ErrStoreClosed + } + fs.mu.RLock() defer fs.mu.RUnlock() - if fs.closed { - return nil, 0, ErrStoreClosed - } if fs.state.Msgs == 0 || start > fs.state.LastSeq { return nil, fs.state.LastSeq, ErrStoreEOF } @@ -8323,7 +8450,7 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store i = nbi - 1 // For the iterator condition i++ } } - // Check is we can expire. + // Check if we can expire. if expireOk { mb.tryForceExpireCache() } @@ -8336,12 +8463,13 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // Will load the next non-deleted msg starting at the start sequence and walking backwards. func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) { + if fs.isClosed() { + return nil, ErrStoreClosed + } + fs.mu.RLock() defer fs.mu.RUnlock() - if fs.closed { - return nil, ErrStoreClosed - } if fs.state.Msgs == 0 || start < fs.state.FirstSeq { return nil, ErrStoreEOF } @@ -8389,6 +8517,10 @@ func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err // LoadPrevMsgMulti will find the previous message matching any entry in the sublist. func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) { + if fs.isClosed() { + return nil, 0, ErrStoreClosed + } + if sl == nil { sm, err = fs.LoadPrevMsg(start, smp) return @@ -8396,9 +8528,6 @@ func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp * fs.mu.RLock() defer fs.mu.RUnlock() - if fs.closed { - return nil, 0, ErrStoreClosed - } if fs.state.Msgs == 0 || start < fs.state.FirstSeq { return nil, fs.state.FirstSeq, ErrStoreEOF } @@ -8952,12 +9081,12 @@ func (fs *fileStore) Purge() (uint64, error) { } func (fs *fileStore) purge(fseq uint64) (uint64, error) { - fs.mu.Lock() - if fs.closed { - fs.mu.Unlock() + if fs.isClosed() { return 0, ErrStoreClosed } + fs.mu.Lock() + purged := fs.state.Msgs rbytes := int64(fs.state.Bytes) @@ -9065,6 +9194,11 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) { fs.mu.Unlock() return fs.purge(seq) } + // Short-circuit if the store was already compacted past this point. + if fs.state.FirstSeq > seq { + fs.mu.Unlock() + return purged, nil + } // We have to delete interior messages. smb := fs.selectMsgBlock(seq) if smb == nil { @@ -9112,7 +9246,12 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) { if err = smb.loadMsgsWithLock(); err != nil { goto SKIP } - defer smb.finishedWithCache() + defer func() { + // The lock is released once we get here, so need to re-acquire. + smb.mu.Lock() + smb.finishedWithCache() + smb.mu.Unlock() + }() } for mseq := atomic.LoadUint64(&smb.first.seq); mseq < seq; mseq++ { sm, err := smb.cacheLookupNoCopy(mseq, &smv) @@ -9290,12 +9429,12 @@ SKIP: // Will completely reset our store. func (fs *fileStore) reset() error { - fs.mu.Lock() - if fs.closed { - fs.mu.Unlock() + if fs.isClosed() { return ErrStoreClosed } + fs.mu.Lock() + var purged, bytes uint64 cb := fs.scb @@ -9381,6 +9520,10 @@ func (mb *msgBlock) tombsLocked() []msgId { // Truncate will truncate a stream store up to seq. Sequence needs to be valid. func (fs *fileStore) Truncate(seq uint64) error { + if fs.isClosed() { + return ErrStoreClosed + } + // Check for request to reset. if seq == 0 { return fs.reset() @@ -9388,11 +9531,6 @@ func (fs *fileStore) Truncate(seq uint64) error { fs.mu.Lock() - if fs.closed { - fs.mu.Unlock() - return ErrStoreClosed - } - // Any existing state file will no longer be applicable. We will force write a new one // at the end, after we release the lock. os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) @@ -9682,6 +9820,14 @@ func (fs *fileStore) purgeMsgBlock(mb *msgBlock) { mb.finishedWithCache() mb.mu.Unlock() fs.selectNextFirst() + + if cb := fs.scb; cb != nil { + // If we have a callback registered, we need to release lock regardless since consumers will recalculate pending. + fs.mu.Unlock() + // Storage updates. + cb(-int64(msgs), -int64(bytes), 0, _EMPTY_) + fs.mu.Lock() + } } // Called by purge to simply get rid of the cache and close our fds. @@ -9893,6 +10039,9 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { func (fs *fileStore) resetGlobalPerSubjectInfo() { // Clear any global subject state. fs.psim, fs.tsl = fs.psim.Empty(), 0 + if fs.noTrackSubjects() { + return + } for _, mb := range fs.blks { fs.populateGlobalPerSubjectInfo(mb) } @@ -10230,6 +10379,10 @@ func (fs *fileStore) forceWriteFullStateLocked() error { // 3. MBs - Index, Bytes, First and Last Sequence and Timestamps, and the deleted map (avl.seqset). // 4. Last block index and hash of record inclusive to this stream state. func (fs *fileStore) _writeFullState(force, needLock bool) error { + if fs.isClosed() { + return nil + } + fsLock := func() { if needLock { fs.mu.Lock() @@ -10243,7 +10396,7 @@ func (fs *fileStore) _writeFullState(force, needLock bool) error { start := time.Now() fsLock() - if fs.closed || fs.dirty == 0 { + if fs.dirty == 0 { fsUnlock() return nil } @@ -10494,8 +10647,12 @@ func (fs *fileStore) Stop() error { // Stop the current filestore. func (fs *fileStore) stop(delete, writeState bool) error { + if fs.isClosed() { + return ErrStoreClosed + } + fs.mu.Lock() - if fs.closed || fs.closing { + if fs.closing { fs.mu.Unlock() return ErrStoreClosed } @@ -10531,7 +10688,7 @@ func (fs *fileStore) stop(delete, writeState bool) error { // Mark as closed. Last message block needs to be cleared after // writeFullState has completed. - fs.closed = true + fs.closed.Store(true) fs.lmb = nil // We should update the upper usage layer on a stop. @@ -10750,11 +10907,12 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err // Create a snapshot of this stream and its consumer's state along with messages. func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumers bool) (*SnapshotResult, error) { - fs.mu.Lock() - if fs.closed { - fs.mu.Unlock() + if fs.isClosed() { return nil, ErrStoreClosed } + + fs.mu.Lock() + // Only allow one at a time. if fs.sips > 0 { fs.mu.Unlock() @@ -11074,7 +11232,9 @@ func (fs *fileStore) ConsumerStore(name string, created time.Time, cfg *Consumer go o.flushLoop(o.fch, o.qch) // Make sure to load in our state from disk if needed. - o.loadState() + if err = o.loadState(); err != nil { + return nil, err + } // Assign to filestore. fs.AddConsumer(o) @@ -11764,10 +11924,15 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er } // Lock should be held. Called at startup. -func (o *consumerFileStore) loadState() { +func (o *consumerFileStore) loadState() error { if _, err := os.Stat(o.ifn); err == nil { // This will load our state in from disk. - o.stateWithCopyLocked(false) + _, err = o.stateWithCopyLocked(false) + return err + } else if os.IsNotExist(err) { + return nil + } else { + return err } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/gsl/gsl.go b/vendor/github.com/nats-io/nats-server/v2/server/gsl/gsl.go index 88274dd234..c16f6bac62 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/gsl/gsl.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/gsl/gsl.go @@ -17,9 +17,6 @@ import ( "errors" "strings" "sync" - "unsafe" - - "github.com/nats-io/nats-server/v2/server/stree" ) // Sublist is a routing mechanism to handle subject distribution and @@ -251,7 +248,9 @@ func matchLevelForAny[T comparable](l *level[T], toks []string, np *int) bool { if np != nil { *np += len(n.subs) } - return len(n.subs) > 0 + if len(n.subs) > 0 { + return true + } } if pwc != nil { if np != nil { @@ -370,6 +369,36 @@ func (s *GenericSublist[T]) Remove(subject string, value T) error { return s.remove(subject, value, true) } +// HasInterestStartingIn is a helper for subject tree intersection. +func (s *GenericSublist[T]) HasInterestStartingIn(subj string) bool { + s.RLock() + defer s.RUnlock() + var _tokens [64]string + tokens := tokenizeSubjectIntoSlice(_tokens[:0], subj) + return hasInterestStartingIn(s.root, tokens) +} + +func hasInterestStartingIn[T comparable](l *level[T], tokens []string) bool { + if l == nil { + return false + } + if len(tokens) == 0 { + return true + } + token := tokens[0] + if l.fwc != nil { + return true + } + found := false + if pwc := l.pwc; pwc != nil { + found = found || hasInterestStartingIn(pwc.next, tokens[1:]) + } + if n := l.nodes[token]; n != nil { + found = found || hasInterestStartingIn(n.next, tokens[1:]) + } + return found +} + // pruneNode is used to prune an empty node from the tree. func (l *level[T]) pruneNode(n *node[T], t string) { if n == nil { @@ -463,86 +492,15 @@ func visitLevel[T comparable](l *level[T], depth int) int { return maxDepth } -// IntersectStree will match all items in the given subject tree that -// have interest expressed in the given sublist. The callback will only be called -// once for each subject, regardless of overlapping subscriptions in the sublist. -func IntersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], sl *GenericSublist[T2], cb func(subj []byte, entry *T1)) { - var _subj [255]byte - intersectStree(st, sl.root, _subj[:0], cb) -} - -func intersectStree[T1 any, T2 comparable](st *stree.SubjectTree[T1], r *level[T2], subj []byte, cb func(subj []byte, entry *T1)) { - nsubj := subj - if len(nsubj) > 0 { - nsubj = append(subj, '.') - } - if r.fwc != nil { - // We've reached a full wildcard, do a FWC match on the stree at this point - // and don't keep iterating downward. - nsubj := append(nsubj, '>') - st.Match(nsubj, cb) - return - } - if r.pwc != nil { - // We've found a partial wildcard. We'll keep iterating downwards, but first - // check whether there's interest at this level (without triggering dupes) and - // match if so. - var done bool - nsubj := append(nsubj, '*') - if len(r.pwc.subs) > 0 { - st.Match(nsubj, cb) - done = true - } - if r.pwc.next.numNodes() > 0 { - intersectStree(st, r.pwc.next, nsubj, cb) - } - if done { - return - } - } - // Normal node with subject literals, keep iterating. - for t, n := range r.nodes { - if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 { - // A wildcard at the next level will already visit these descendents - // so skip so we don't callback the same subject more than once. - continue - } - nsubj := append(nsubj, t...) - if len(n.subs) > 0 { - if subjectHasWildcard(bytesToString(nsubj)) { - st.Match(nsubj, cb) - } else { - if e, ok := st.Find(nsubj); ok { - cb(nsubj, e) - } - } - } - if n.next.numNodes() > 0 { - intersectStree(st, n.next, nsubj, cb) - } - } -} - -// Determine if a subject has any wildcard tokens. -func subjectHasWildcard(subject string) bool { - // This one exits earlier then !subjectIsLiteral(subject) - for i, c := range subject { - if c == pwc || c == fwc { - if (i == 0 || subject[i-1] == btsep) && - (i+1 == len(subject) || subject[i+1] == btsep) { - return true - } +// use similar to append. meaning, the updated slice will be returned +func tokenizeSubjectIntoSlice(tts []string, subject string) []string { + start := 0 + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tts = append(tts, subject[start:i]) + start = i + 1 } } - return false -} - -// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer. -// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same. -func bytesToString(b []byte) string { - if len(b) == 0 { - return _EMPTY_ - } - p := unsafe.SliceData(b) - return unsafe.String(p, len(b)) + tts = append(tts, subject[start:]) + return tts } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index 78763f3ece..a74fc0d821 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1142,6 +1142,12 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c } js.mu.Lock() + // Accounts get reset to nil on shutdown, since we re-acquire the locks here, we need to check again. + if js.accounts == nil { + js.mu.Unlock() + return NewJSNotEnabledError() + } + if jsa, ok := js.accounts[a.Name]; ok { a.mu.Lock() a.js = jsa @@ -1370,7 +1376,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c } obs, err := mset.addConsumerWithAssignment(&cfg.ConsumerConfig, _EMPTY_, nil, true, ActionCreateOrUpdate, false) if err != nil { - s.Warnf(" Error adding consumer %q: %v", cfg.Name, err) + s.Warnf(" Error adding consumer '%s > %s > %s': %v", a.Name, mset.name(), cfg.Name, err) continue } if isEphemeral { @@ -1379,9 +1385,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c if !cfg.Created.IsZero() { obs.setCreatedTime(cfg.Created) } - if err != nil { - s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err) - } } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index 1f42bf0c20..e2c25d2169 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2025 The NATS Authors +// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1296,11 +1296,6 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiAccountInfoResponse{ApiResponse: ApiResponse{Type: JSApiAccountInfoResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1319,6 +1314,12 @@ func (s *Server) jsAccountInfoRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if !doErr { return @@ -1612,11 +1613,6 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1635,6 +1631,12 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1729,11 +1731,6 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1752,6 +1749,12 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1832,11 +1835,6 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamNamesResponse{ApiResponse: ApiResponse{Type: JSApiStreamNamesResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1855,6 +1853,12 @@ func (s *Server) jsStreamNamesRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -1967,11 +1971,6 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s ApiResponse: ApiResponse{Type: JSApiStreamListResponseType}, Streams: []*StreamInfo{}, } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -1990,6 +1989,12 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -2090,11 +2095,6 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s if rt := getHeader(JSResponseType, hdr); len(rt) > 0 && string(rt) == jsCreateResponse { resp.ApiResponse.Type = JSApiStreamCreateResponseType } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } var clusterWideConsCount int @@ -2184,6 +2184,12 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -2309,11 +2315,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * name := tokenAt(subject, 6) var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderStepDownResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are not in clustered mode this is a failed request. if !s.JetStreamIsClustered() { @@ -2345,6 +2346,12 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * return } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -2421,11 +2428,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ } var resp = JSApiConsumerLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiConsumerLeaderStepDownResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are not in clustered mode this is a failed request. if !s.JetStreamIsClustered() { @@ -2460,6 +2462,13 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ } else if sa == nil { return } + + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + var ca *consumerAssignment if sa.consumers != nil { ca = sa.consumers[consumer] @@ -2547,11 +2556,6 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco name := tokenAt(subject, 6) var resp = JSApiStreamRemovePeerResponse{ApiResponse: ApiResponse{Type: JSApiStreamRemovePeerResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are not in clustered mode this is a failed request. if !s.JetStreamIsClustered() { @@ -2580,6 +2584,12 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco return } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3072,11 +3082,6 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac accName := tokenAt(subject, 5) var resp = JSApiAccountPurgeResponse{ApiResponse: ApiResponse{Type: JSApiAccountPurgeResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if !s.JetStreamIsClustered() { var streams []*stream @@ -3117,6 +3122,12 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac return } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if js.isMetaRecovering() { // While in recovery mode, the data structures are not fully initialized resp.Error = NewJSClusterNotAvailError() @@ -3339,11 +3350,6 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, } var resp = JSApiStreamDeleteResponse{ApiResponse: ApiResponse{Type: JSApiStreamDeleteResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -3362,6 +3368,12 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3414,11 +3426,6 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su stream := tokenAt(subject, 6) var resp = JSApiMsgDeleteResponse{ApiResponse: ApiResponse{Type: JSApiMsgDeleteResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { @@ -3467,6 +3474,12 @@ func (s *Server) jsMsgDeleteRequest(sub *subscription, c *client, _ *Account, su } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3538,11 +3551,6 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje stream := tokenAt(subject, 6) var resp = JSApiMsgGetResponse{ApiResponse: ApiResponse{Type: JSApiMsgGetResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { @@ -3591,6 +3599,12 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3695,31 +3709,8 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account stream := streamNameFromSubject(subject) consumer := consumerNameFromSubject(subject) - var req JSApiConsumerUnpinRequest var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - if err := json.Unmarshal(msg, &req); err != nil { - resp.Error = NewJSInvalidJSONError(err) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - - if req.Group == _EMPTY_ { - resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified")) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - - if !validGroupName.MatchString(req.Group) { - resp.Error = NewJSConsumerInvalidGroupNameError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if s.JetStreamIsClustered() { // Check to make sure the stream is assigned. js, cc := s.getJetStreamCluster() @@ -3771,6 +3762,31 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + var req JSApiConsumerUnpinRequest + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError(err) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if req.Group == _EMPTY_ { + resp.Error = NewJSInvalidJSONError(errors.New("consumer group not specified")) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if !validGroupName.MatchString(req.Group) { + resp.Error = NewJSConsumerInvalidGroupNameError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -3834,11 +3850,6 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, stream := streamNameFromSubject(subject) var resp = JSApiStreamPurgeResponse{ApiResponse: ApiResponse{Type: JSApiStreamPurgeResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { @@ -3890,6 +3901,12 @@ func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -4507,11 +4524,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } var req CreateConsumerRequest if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil { @@ -4548,6 +4560,20 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { + resp.Error = NewJSNotEnabledForAccountError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + return + } + var streamName, consumerName, filteredSubject string var rt ccReqType @@ -4580,14 +4606,6 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun } } - if hasJS, doErr := acc.checkJetStream(); !hasJS { - if doErr { - resp.Error = NewJSNotEnabledForAccountError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - } - return - } - if streamName != req.Stream { resp.Error = NewJSStreamMismatchError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -4732,11 +4750,6 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account ApiResponse: ApiResponse{Type: JSApiConsumerNamesResponseType}, Consumers: []string{}, } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -4755,6 +4768,12 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, _ *Account } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -4859,11 +4878,6 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, ApiResponse: ApiResponse{Type: JSApiConsumerListResponseType}, Consumers: []*ConsumerInfo{}, } - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -4883,7 +4897,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, } if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSClusterNotAvailError() + resp.Error = NewJSRequiredApiLevelError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } @@ -4973,11 +4987,6 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, consumerName := consumerNameFromSubject(subject) var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if !isEmptyRequest(msg) { resp.Error = NewJSNotEmptyRequestError() @@ -5128,6 +5137,12 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if !acc.JetStreamEnabled() { resp.Error = NewJSNotEnabledForAccountError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) @@ -5175,11 +5190,6 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun } var resp = JSApiConsumerDeleteResponse{ApiResponse: ApiResponse{Type: JSApiConsumerDeleteResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { @@ -5198,6 +5208,12 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, _ *Accoun } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() @@ -5253,11 +5269,6 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account var req JSApiConsumerPauseRequest var resp = JSApiConsumerPauseResponse{ApiResponse: ApiResponse{Type: JSApiConsumerPauseResponseType}} - if errorOnRequiredApiLevel(hdr) { - resp.Error = NewJSRequiredApiLevelError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } if isJSONObjectOrArray(msg) { if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil { @@ -5285,6 +5296,12 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account } } + if errorOnRequiredApiLevel(hdr) { + resp.Error = NewJSRequiredApiLevelError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if hasJS, doErr := acc.checkJetStream(); !hasJS { if doErr { resp.Error = NewJSNotEnabledForAccountError() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index a630a4cf02..89966b43a6 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -3142,20 +3142,20 @@ func (mset *stream) resetClusteredState(err error) bool { stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas mset.mu.RUnlock() - assert.Unreachable("Reset clustered state", map[string]any{ - "stream": name, - "account": acc.Name, - "err": err, - }) - // The stream might already be deleted and not assigned to us anymore. // In any case, don't revive the stream if it's already closed. - if mset.closed.Load() { + if mset.closed.Load() || (node != nil && node.IsDeleted()) { s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name()) // Explicitly returning true here, we want the outside to break out of the monitoring loop as well. return true } + assert.Unreachable("Reset clustered state", map[string]any{ + "stream": name, + "account": acc.Name, + "err": err, + }) + // Stepdown regardless if we are the leader here. if node != nil { node.StepDown() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index 62555486df..0ebde6a9f8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "slices" - "sort" "sync" "time" @@ -445,7 +444,6 @@ func (ms *memStore) RegisterProcessJetStreamMsg(cb ProcessJetStreamMsgHandler) { // GetSeqFromTime looks for the first sequence number that has the message // with >= timestamp. -// FIXME(dlc) - inefficient. func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { ts := t.UnixNano() ms.mu.RLock() @@ -469,18 +467,57 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { last := lmsg.ts if ts == last { - return ms.state.LastSeq + return lmsg.seq } if ts > last { return ms.state.LastSeq + 1 } - index := sort.Search(len(ms.msgs), func(i int) bool { - if msg := ms.msgs[ms.state.FirstSeq+uint64(i)]; msg != nil { - return msg.ts >= ts + + var ( + cts int64 + cseq uint64 + off uint64 + ) + + // Using a binary search, but need to be aware of interior deletes. + fseq := ms.state.FirstSeq + lseq := ms.state.LastSeq + seq := lseq + 1 +loop: + for fseq <= lseq { + mid := fseq + (lseq-fseq)/2 + off = 0 + // Potentially skip over gaps. We keep the original middle but keep track of a + // potential delete range with an offset. + for { + msg := ms.msgs[mid+off] + if msg == nil { + off++ + if mid+off <= lseq { + continue + } else { + // Continue search to the left. Purposely ignore the skipped deletes here. + lseq = mid - 1 + continue loop + } + } + cts = msg.ts + cseq = msg.seq + break } - return false - }) - return uint64(index) + ms.state.FirstSeq + if cts >= ts { + seq = cseq + if mid == fseq { + break + } + // Continue search to the left. + lseq = mid - 1 + } else { + // Continue search to the right (potentially skipping over interior deletes). + fseq = mid + off + 1 + } + } + return seq } // FilteredState will return the SimpleState associated with the filtered subject and a proposed starting sequence. @@ -906,7 +943,7 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerS var havePartial bool var totalSkipped uint64 // We will track start and end sequences as we go. - gsl.IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { + stree.IntersectGSL[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { if fss.firstNeedsUpdate || fss.lastNeedsUpdate { ms.recalculateForSubj(bytesToString(subj), fss) } @@ -1463,6 +1500,12 @@ func (ms *memStore) compact(seq uint64) (uint64, error) { var purged, bytes uint64 ms.mu.Lock() + // Short-circuit if the store was already compacted past this point. + if ms.state.FirstSeq > seq { + ms.mu.Unlock() + return purged, nil + } + cb := ms.scb if seq <= ms.state.LastSeq { fseq := ms.state.FirstSeq diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index 10e3af057d..c1ad73e91e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2025 The NATS Authors +// Copyright 2013-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1279,6 +1279,7 @@ type Varz struct { SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats are statistics about all detected Slow Consumer StaleConnectionStats *StaleConnectionStats `json:"stale_connection_stats,omitempty"` // StaleConnectionStats are statistics about all detected Stale Connections Proxies *ProxiesOptsVarz `json:"proxies,omitempty"` // Proxies hold information about network proxy devices + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate of this server } // JetStreamVarz contains basic runtime information about jetstream @@ -1291,34 +1292,36 @@ type JetStreamVarz struct { // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { - Name string `json:"name,omitempty"` // Name is the configured cluster name - Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections - Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections - AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication - URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs - TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete - TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections - TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed - PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size - WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete - WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections + Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // GatewayOptsVarz contains monitoring gateway information type GatewayOptsVarz struct { - Name string `json:"name,omitempty"` // Name is the configured cluster name - Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections - Port int `json:"port,omitempty"` // Port is the post gateway connections listens on - AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication - TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete - TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections - TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed - Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients - ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make - Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes - RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected - WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete - WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections + Port int `json:"port,omitempty"` // Port is the post gateway connections listens on + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients + ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make + Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes + RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected + WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete + WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificaet } // RemoteGatewayOptsVarz contains monitoring remote gateway information @@ -1340,6 +1343,7 @@ type LeafNodeOptsVarz struct { TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete WriteTimeout string `json:"write_timeout,omitempty"` // WriteTimeout is the closure policy for write deadline errors + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // DenyRules Contains lists of subjects not allowed to be imported/exported @@ -1370,6 +1374,7 @@ type MQTTOptsVarz struct { AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // WebsocketOptsVarz contains monitoring websocket information @@ -1388,6 +1393,7 @@ type WebsocketOptsVarz struct { AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done + TLSCertNotAfter time.Time `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate } // OCSPResponseCacheVarz contains OCSP response cache information @@ -1454,6 +1460,22 @@ func myUptime(d time.Duration) string { return fmt.Sprintf("%ds", tsecs) } +func tlsCertNotAfter(config *tls.Config) time.Time { + if config == nil || len(config.Certificates) == 0 { + return time.Time{} + } + cert := config.Certificates[0] + leaf := cert.Leaf + if leaf == nil { + var err error + leaf, err = x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return time.Time{} + } + } + return leaf.NotAfter +} + // HandleRoot will show basic info and links to others handlers. func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { // This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799 @@ -1779,6 +1801,13 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify + v.TLSCertNotAfter = tlsCertNotAfter(opts.TLSConfig) + v.Cluster.TLSCertNotAfter = tlsCertNotAfter(opts.Cluster.TLSConfig) + v.Gateway.TLSCertNotAfter = tlsCertNotAfter(opts.Gateway.TLSConfig) + v.LeafNode.TLSCertNotAfter = tlsCertNotAfter(opts.LeafNode.TLSConfig) + v.MQTT.TLSCertNotAfter = tlsCertNotAfter(opts.MQTT.TLSConfig) + v.Websocket.TLSCertNotAfter = tlsCertNotAfter(opts.Websocket.TLSConfig) + if opts.Proxies != nil { if v.Proxies == nil { v.Proxies = &ProxiesOptsVarz{} @@ -3982,6 +4011,11 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { return health } +// Healthz returns the health status of the server. +func (s *Server) Healthz(opts *HealthzOptions) *HealthStatus { + return s.healthz(opts) +} + type ExpvarzStatus struct { Memstats json.RawMessage `json:"memstats"` Cmdline json.RawMessage `json:"cmdline"` diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index b77663de1f..f989fd530c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -2326,7 +2326,7 @@ func parseJetStreamForAccount(v any, acc *Account, errors *[]error) error { case "cluster_traffic": vv, ok := mv.(string) if !ok { - return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'account' string value for %q, got %v", mk, mv)} + return &configErr{tk, fmt.Sprintf("Expected either 'system' or 'owner' string value for %q, got %v", mk, mv)} } switch vv { case "system", _EMPTY_: diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 8d845143de..5e24209373 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -1,4 +1,4 @@ -// Copyright 2020-2025 The NATS Authors +// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -83,6 +83,7 @@ type RaftNode interface { Stop() WaitForStop() Delete() + IsDeleted() bool RecreateInternalSubs() error IsSystemAccount() bool GetTrafficAccountName() string @@ -231,6 +232,7 @@ type raft struct { initializing bool // The node is new, and "empty log" checks can be temporarily relaxed. scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data. membChanging bool // There is a membership change proposal in progress + deleted bool // If the node was deleted. } type proposedEntry struct { @@ -1735,11 +1737,6 @@ func (n *raft) StepDown(preferred ...string) error { } } } - - // Clear our vote state. - n.vote = noVote - n.writeTermVote() - n.Unlock() if len(preferred) > 0 && maybeLeader == noLeader { @@ -1920,6 +1917,7 @@ func (n *raft) Delete() { n.Lock() defer n.Unlock() + n.deleted = true if wal := n.wal; wal != nil { wal.Delete(false) } @@ -1927,6 +1925,12 @@ func (n *raft) Delete() { n.debug("Deleted") } +func (n *raft) IsDeleted() bool { + n.RLock() + defer n.RUnlock() + return n.deleted +} + func (n *raft) shutdown() { // First call to Stop or Delete should close the quit chan // to notify the runAs goroutines to stop what they're doing. @@ -3401,9 +3405,9 @@ func (n *raft) runAsCandidate() { n.requestVote() // We vote for ourselves. - votes := map[string]struct{}{ - n.ID(): {}, - } + n.votes.push(&voteResponse{term: n.term, peer: n.ID(), granted: true}) + + votes := map[string]struct{}{} emptyVotes := map[string]struct{}{} for n.State() == Candidate { @@ -3968,10 +3972,6 @@ CONTINUE: // Here we can become a leader but need to wait for resume of the apply queue. n.lxfer = true } - } else if n.vote != noVote { - // Since we are here we are not the chosen one but we should clear any vote preference. - n.vote = noVote - n.writeTermVote() } } case EntryAddPeer: @@ -4211,6 +4211,9 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) error { if !shouldStore { ae.returnToPool() } + if n.csz == 1 { + n.tryCommit(n.pindex) + } return nil } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index 906f111491..5ac0483e2f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2025 The NATS Authors +// Copyright 2019-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -425,6 +425,7 @@ type stream struct { active bool // Indicates that there are active internal subscriptions (for the subject filters) // and/or mirror/sources consumers are scheduled to be established or already started. closed atomic.Bool // Set to true when stop() is called on the stream. + cisrun atomic.Bool // Indicates one checkInterestState is already running. // Mirror mirror *sourceInfo @@ -837,9 +838,9 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Add created timestamp used for the store, must match that of the stream assignment if it exists. if sa != nil { - js.mu.RLock() + // The following assignment does not require mutex + // protection: sa.Created is immutable. mset.created = sa.Created - js.mu.RUnlock() } // Start our signaling routine to process consumers. @@ -1821,7 +1822,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo // check for duplicates var iNames = make(map[string]struct{}) for _, src := range cfg.Sources { - if !isValidName(src.Name) { + if src == nil || !isValidName(src.Name) { return StreamConfig{}, NewJSSourceInvalidStreamNameError() } if _, ok := iNames[src.composeIName()]; !ok { @@ -3162,7 +3163,6 @@ func (mset *stream) setupMirrorConsumer() error { } mirror := mset.mirror - mirrorWg := &mirror.wg // We want to throttle here in terms of how fast we request new consumers, // or if the previous is still in progress. @@ -3321,7 +3321,16 @@ func (mset *stream) setupMirrorConsumer() error { // Wait for previous processMirrorMsgs go routine to be completely done. // If none is running, this will not block. - mirrorWg.Wait() + mset.mu.Lock() + if mset.mirror == nil { + // Mirror config has been removed. + mset.mu.Unlock() + return + } else { + wg := &mset.mirror.wg + mset.mu.Unlock() + wg.Wait() + } select { case ccr := <-respCh: @@ -6045,13 +6054,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return nil } - // If here we will attempt to store the message. - // Assume this will succeed. - olmsgId := mset.lmsgId - mset.lmsgId = msgId - mset.lseq++ - tierName := mset.tier - // Republish state if needed. var tsubj string var tlseq uint64 @@ -6075,7 +6077,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // If clustered this was already checked and we do not want to check here and possibly introduce skew. // Don't error and log if we're tracing when clustered. if !isClustered { - if exceeded, err := jsa.wouldExceedLimits(stype, tierName, mset.cfg.Replicas, subject, hdr, msg); exceeded { + if exceeded, err := jsa.wouldExceedLimits(stype, mset.tier, mset.cfg.Replicas, subject, hdr, msg); exceeded { if err == nil { err = NewJSAccountResourcesExceededError() } @@ -6134,11 +6136,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err) return err } - // If we did not succeed put those values back and increment clfs in case we are clustered. - var state StreamState - mset.store.FastState(&state) - mset.lseq = state.LastSeq - mset.lmsgId = olmsgId + // If we did not succeed increment clfs in case we are clustered. bumpCLFS() switch err { @@ -6159,6 +6157,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } // If here we succeeded in storing the message. + mset.lmsgId = msgId + mset.lseq = seq // If we have a msgId make sure to save. // This will replace our estimate from the cluster layer if we are clustered. @@ -7298,11 +7298,30 @@ func (mset *stream) checkInterestState() { return } + // Ensure only one of these runs at the same time. + if !mset.cisrun.CompareAndSwap(false, true) { + return + } + defer mset.cisrun.Store(false) + var ss StreamState mset.store.FastState(&ss) + asflr := uint64(math.MaxUint64) for _, o := range mset.getConsumers() { o.checkStateForInterestStream(&ss) + o.mu.RLock() + chkflr := o.chkflr + o.mu.RUnlock() + asflr = min(asflr, chkflr) + } + + mset.cfgMu.RLock() + rp := mset.cfg.Retention + mset.cfgMu.RUnlock() + // Remove as many messages from the "head" of the stream if there's no interest anymore. + if rp == InterestPolicy && asflr != math.MaxUint64 { + mset.store.Compact(asflr) } } @@ -7389,20 +7408,18 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) { o.sigSubs = nil } - if o.isLeader() { - if mset.csl == nil { - mset.csl = gsl.NewSublist[*consumer]() - } - // If no filters are preset, add fwcs to sublist for that consumer. - if newFilters == nil { - mset.csl.Insert(fwcs, o) - o.sigSubs = append(o.sigSubs, fwcs) - // If there are filters, add their subjects to sublist. - } else { - for _, filter := range newFilters { - mset.csl.Insert(filter, o) - o.sigSubs = append(o.sigSubs, filter) - } + if mset.csl == nil { + mset.csl = gsl.NewSublist[*consumer]() + } + // If no filters are present, add fwcs to sublist for that consumer. + if newFilters == nil { + mset.csl.Insert(fwcs, o) + o.sigSubs = append(o.sigSubs, fwcs) + } else { + // If there are filters, add their subjects to sublist. + for _, filter := range newFilters { + mset.csl.Insert(filter, o) + o.sigSubs = append(o.sigSubs, filter) } } o.mu.Unlock() @@ -7479,14 +7496,18 @@ func (mset *stream) partitionUnique(name string, partitions []string) bool { if n == name { continue } + o.mu.RLock() if o.subjf == nil { + o.mu.RUnlock() return false } for _, filter := range o.subjf { if SubjectsCollide(partition, filter.subject) { + o.mu.RUnlock() return false } } + o.mu.RUnlock() } } return true diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go index 28dc72f08b..d0cd08ac95 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stree/stree.go @@ -16,6 +16,9 @@ package stree import ( "bytes" "slices" + "unsafe" + + "github.com/nats-io/nats-server/v2/server/gsl" ) // SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects. @@ -448,3 +451,60 @@ func LazyIntersect[TL, TR any](tl *SubjectTree[TL], tr *SubjectTree[TR], cb func }) } } + +// IntersectGSL will match all items in the given subject tree that +// have interest expressed in the given sublist. The callback will only be called +// once for each subject, regardless of overlapping subscriptions in the sublist. +func IntersectGSL[T any, SL comparable](t *SubjectTree[T], sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) { + if t == nil || t.root == nil || sl == nil { + return + } + var _pre [256]byte + _intersectGSL(t.root, _pre[:0], sl, cb) +} + +func _intersectGSL[T any, SL comparable](n node, pre []byte, sl *gsl.GenericSublist[SL], cb func(subject []byte, val *T)) { + if n.isLeaf() { + ln := n.(*leaf[T]) + subj := append(pre, ln.suffix...) + if sl.HasInterest(bytesToString(subj)) { + cb(subj, &ln.value) + } + return + } + bn := n.base() + pre = append(pre, bn.prefix...) + for _, cn := range n.children() { + if cn == nil { + continue + } + subj := append(pre, cn.path()...) + if !hasInterestForTokens(sl, subj, len(pre)) { + continue + } + _intersectGSL(cn, pre, sl, cb) + } +} + +// The subject tree can return partial tokens so we need to check starting interest +// only from whole tokens when we encounter a tsep. +func hasInterestForTokens[SL comparable](sl *gsl.GenericSublist[SL], subj []byte, since int) bool { + for i := since; i < len(subj); i++ { + if subj[i] == tsep { + if !sl.HasInterestStartingIn(bytesToString(subj[:i])) { + return false + } + } + } + return true +} + +// Note this will avoid a copy of the data used for the string, but it will also reference the existing slice's data pointer. +// So this should be used sparingly when we know the encompassing byte slice's lifetime is the same. +func bytesToString(b []byte) string { + if len(b) == 0 { + return "" + } + p := unsafe.SliceData(b) + return unsafe.String(p, len(b)) +} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go index 6d4d145f3e..150301e32f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/sublist.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/sublist.go @@ -21,8 +21,6 @@ import ( "sync" "sync/atomic" "unicode/utf8" - - "github.com/nats-io/nats-server/v2/server/stree" ) // Sublist is a routing mechanism to handle subject distribution and @@ -818,7 +816,9 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool { *nq += len(qsub) } } - return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 + if len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 { + return true + } } if pwc != nil { if np != nil && nq != nil { @@ -1726,63 +1726,3 @@ func getAllNodes(l *level, results *SublistResult) { getAllNodes(n.next, results) } } - -// IntersectStree will match all items in the given subject tree that -// have interest expressed in the given sublist. The callback will only be called -// once for each subject, regardless of overlapping subscriptions in the sublist. -func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj []byte, entry *T)) { - var _subj [255]byte - intersectStree(st, sl.root, _subj[:0], cb) -} - -func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) { - nsubj := subj - if len(nsubj) > 0 { - nsubj = append(subj, '.') - } - if r.fwc != nil { - // We've reached a full wildcard, do a FWC match on the stree at this point - // and don't keep iterating downward. - nsubj := append(nsubj, '>') - st.Match(nsubj, cb) - return - } - if r.pwc != nil { - // We've found a partial wildcard. We'll keep iterating downwards, but first - // check whether there's interest at this level (without triggering dupes) and - // match if so. - var done bool - nsubj := append(nsubj, '*') - if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 { - st.Match(nsubj, cb) - done = true - } - if r.pwc.next.numNodes() > 0 { - intersectStree(st, r.pwc.next, nsubj, cb) - } - if done { - return - } - } - // Normal node with subject literals, keep iterating. - for t, n := range r.nodes { - if r.pwc != nil && r.pwc.next.numNodes() > 0 && n.next.numNodes() > 0 { - // A wildcard at the next level will already visit these descendents - // so skip so we don't callback the same subject more than once. - continue - } - nsubj := append(nsubj, t...) - if len(n.psubs)+len(n.qsubs) > 0 { - if subjectHasWildcard(bytesToString(nsubj)) { - st.Match(nsubj, cb) - } else { - if e, ok := st.Find(nsubj); ok { - cb(nsubj, e) - } - } - } - if n.next.numNodes() > 0 { - intersectStree(st, n.next, nsubj, cb) - } - } -} diff --git a/vendor/github.com/nats-io/nats-server/v2/server/util.go b/vendor/github.com/nats-io/nats-server/v2/server/util.go index 4e08e3f0cd..f8ce7ab498 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/util.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/util.go @@ -1,4 +1,4 @@ -// Copyright 2012-2024 The NATS Authors +// Copyright 2012-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/modules.txt b/vendor/modules.txt index f9e313f48d..b773289fa9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -746,7 +746,7 @@ github.com/google/go-querystring/query # github.com/google/go-tika v0.3.1 ## explicit; go 1.11 github.com/google/go-tika/tika -# github.com/google/go-tpm v0.9.7 +# github.com/google/go-tpm v0.9.8 ## explicit; go 1.22 github.com/google/go-tpm/legacy/tpm2 github.com/google/go-tpm/tpmutil @@ -873,7 +873,7 @@ github.com/justinas/alice # github.com/kevinburke/ssh_config v1.2.0 ## explicit github.com/kevinburke/ssh_config -# github.com/klauspost/compress v1.18.2 +# github.com/klauspost/compress v1.18.3 ## explicit; go 1.23 github.com/klauspost/compress github.com/klauspost/compress/flate @@ -1155,7 +1155,7 @@ github.com/munnerz/goautoneg # github.com/nats-io/jwt/v2 v2.8.0 ## explicit; go 1.23.0 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.12.3 +# github.com/nats-io/nats-server/v2 v2.12.4 ## explicit; go 1.24.0 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/fastrand