Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrServiceUnavailable = &Error{Code: 502, Message: "service unavailable"}
ErrCRCMismatch = errors.New("nntp: yEnc CRC mismatch")
ErrProtocolDesync = errors.New("nntp: protocol desync: expected status line, got binary data")
ErrQuotaExceeded = errors.New("nntp: download quota exceeded")
)

// toError maps an NNTP status code to a sentinel error, or returns nil for success codes.
Expand Down
11 changes: 11 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type providerStats struct {
Missing atomic.Int64 // 430/423 responses
Errors atomic.Int64 // network errors, bad status codes
Ping PingResult // result of initial DATE ping

// Quota tracking. quotaBytes is set once at group init (0 = unlimited).
quotaBytes int64
quotaUsed atomic.Int64 // bytes consumed in the current quota period
quotaExceeded atomic.Bool // cached flag: set when quotaUsed >= quotaBytes; cleared on period reset
}

// ProviderStats is a public snapshot of one provider's metrics.
Expand All @@ -31,6 +36,12 @@ type ProviderStats struct {
ActiveConnections int // currently running connections
MaxConnections int // configured connection slots
Ping PingResult

// Quota fields. QuotaBytes is 0 when no quota is configured.
QuotaBytes int64 // configured limit per period (0 = unlimited)
QuotaUsed int64 // bytes consumed in the current period
QuotaResetAt time.Time // when the quota period resets; zero if no period
QuotaExceeded bool // true when QuotaUsed >= QuotaBytes > 0
}

// ClientStats is an aggregate snapshot of all provider metrics.
Expand Down
117 changes: 98 additions & 19 deletions nntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,13 @@ func (c *NNTPConnection) readerLoop() {
}

if c.stats != nil {
c.stats.BytesConsumed.Add(int64(decoder.BytesConsumed))
n := int64(decoder.BytesConsumed)
c.stats.BytesConsumed.Add(n)
if c.stats.quotaBytes > 0 {
if c.stats.quotaUsed.Add(n) >= c.stats.quotaBytes {
c.stats.quotaExceeded.Store(true)
}
}
if resp.Err != nil {
c.stats.Errors.Add(1)
} else if decoder.StatusCode == 430 || decoder.StatusCode == 423 {
Expand Down Expand Up @@ -1163,6 +1169,15 @@ type Provider struct {

// UserAgent identifies this client to the NNTP server. Empty string disables it.
UserAgent string

// QuotaBytes is the maximum number of bytes that may be downloaded from this
// provider per QuotaPeriod. 0 means unlimited.
QuotaBytes int64

// QuotaPeriod is the rolling window after which the quota counter resets.
// 0 means the quota never resets (lifetime cap).
// Typical value: 30 * 24 * time.Hour (≈ monthly)
QuotaPeriod time.Duration
}

type providerGroup struct {
Expand All @@ -1178,6 +1193,37 @@ type providerGroup struct {
stats providerStats
cancel context.CancelFunc // cancels this group's slot goroutines
p Provider // original config; used for auto-reconnect

// Quota period configuration. quotaBytes/quotaUsed/quotaExceeded live in
// stats so that NNTPConnection can update them via its *providerStats pointer.
quotaPeriod time.Duration // 0 = no auto-reset
quotaResetAt atomic.Int64 // Unix nanoseconds of next reset; 0 = never
}

// isQuotaExceeded reports whether this provider has consumed its download quota
// for the current period.
//
// Fast path (quota not exceeded): single atomic.Bool load (~1 ns).
// Slow path (flag set, period elapsed): resets counters and returns false.
// The time.Now() call is deferred until the cached flag is actually set.
func (g *providerGroup) isQuotaExceeded() bool {
if g.stats.quotaBytes <= 0 {
return false // unlimited
}
if !g.stats.quotaExceeded.Load() {
return false // fast path: quota not yet hit
}
// Flag is set. If a reset period is configured, check whether it has elapsed.
if g.quotaPeriod > 0 {
resetAt := g.quotaResetAt.Load()
if resetAt > 0 && time.Now().UnixNano() >= resetAt {
g.stats.quotaUsed.Store(0)
g.stats.quotaExceeded.Store(false)
g.quotaResetAt.Store(time.Now().Add(g.quotaPeriod).UnixNano())
return false
}
}
return true
}

type Client struct {
Expand Down Expand Up @@ -1314,17 +1360,22 @@ func (c *Client) startProviderGroup(p Provider, index int) *providerGroup {
gctx, gcancel := context.WithCancel(c.ctx)

g := &providerGroup{
name: name,
host: p.Host,
maxConns: p.Connections,
ctx: gctx,
reqCh: make(chan *Request, p.Connections),
prioCh: make(chan *Request, p.Connections),
hotReqCh: make(chan *Request),
hotPrioCh: make(chan *Request),
gate: gate,
cancel: gcancel,
p: p,
name: name,
host: p.Host,
maxConns: p.Connections,
ctx: gctx,
reqCh: make(chan *Request, p.Connections),
prioCh: make(chan *Request, p.Connections),
hotReqCh: make(chan *Request),
hotPrioCh: make(chan *Request),
gate: gate,
cancel: gcancel,
p: p,
quotaPeriod: p.QuotaPeriod,
}
g.stats.quotaBytes = p.QuotaBytes
if p.QuotaBytes > 0 && p.QuotaPeriod > 0 {
g.quotaResetAt.Store(time.Now().Add(p.QuotaPeriod).UnixNano())
}

// Ping with a short timeout so we don't block forever.
Expand Down Expand Up @@ -1562,26 +1613,36 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter
var start int
switch c.dispatch {
case DispatchFIFO:
// Priority order: first provider with available capacity,
// falling back to provider 0 if all are saturated.
// Priority order: first provider with available capacity and within quota,
// falling back to provider 0 if all are saturated or exceeded.
for i, g := range mains {
if g.gate.available.Load() > 0 {
if g.gate.available.Load() > 0 && !g.isQuotaExceeded() {
start = i
break
}
}
default: // DispatchRoundRobin
// Dynamic weighted round-robin: each provider's weight equals
// its available capacity (allowed - held).
// its available capacity (allowed - held). Quota-exceeded providers
// get weight 0 so they are never selected during normal dispatch.
cumWeights := make([]int, n)
totalW := 0
for i, g := range mains {
avail := max(1, int(g.gate.available.Load()))
avail := 0
if !g.isQuotaExceeded() {
avail = max(1, int(g.gate.available.Load()))
}
totalW += avail
cumWeights[i] = totalW
}
slot := int(c.nextIdx.Add(1) % uint64(totalW))
start = sort.SearchInts(cumWeights, slot+1)
if totalW == 0 {
// All providers are quota-exceeded; start at 0 and let the main
// loop below return ErrQuotaExceeded for each.
start = 0
} else {
slot := int(c.nextIdx.Add(1) % uint64(totalW))
start = sort.SearchInts(cumWeights, slot+1)
}
}

for attempt := range n {
Expand All @@ -1590,6 +1651,10 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter
if hostSkipped(g.host, &skipHosts, skipCount) {
continue
}
if g.isQuotaExceeded() {
lastErr = fmt.Errorf("%s: %w", g.name, ErrQuotaExceeded)
continue
}
resp, ok, cancelled := tryGroup(g)
if cancelled {
err := ctx.Err()
Expand Down Expand Up @@ -1639,6 +1704,10 @@ func (c *Client) doSendWithRetry(ctx context.Context, payload []byte, bodyWriter
if hostSkipped(g.host, &skipHosts, skipCount) {
continue
}
if g.isQuotaExceeded() {
lastErr = fmt.Errorf("%s: %w", g.name, ErrQuotaExceeded)
continue
}
resp, ok, cancelled := tryGroup(g)
if cancelled {
err := ctx.Err()
Expand Down Expand Up @@ -1695,6 +1764,7 @@ func (c *Client) Stats() ClientStats {
consumed := g.stats.BytesConsumed.Load()
totalBytes += consumed
maxSlots, running := g.gate.snapshot()
quotaUsed := g.stats.quotaUsed.Load()
ps := ProviderStats{
Name: g.name,
BytesConsumed: consumed,
Expand All @@ -1703,6 +1773,15 @@ func (c *Client) Stats() ClientStats {
ActiveConnections: running,
MaxConnections: maxSlots,
Ping: g.stats.Ping,
QuotaBytes: g.stats.quotaBytes,
QuotaUsed: quotaUsed,
QuotaExceeded: g.stats.quotaBytes > 0 && quotaUsed >= g.stats.quotaBytes,
}
if g.stats.quotaBytes > 0 && g.quotaPeriod > 0 {
resetAt := g.quotaResetAt.Load()
if resetAt > 0 {
ps.QuotaResetAt = time.Unix(0, resetAt)
}
}
if secs > 0 {
ps.AvgSpeed = float64(consumed) / secs
Expand Down
Loading
Loading