Skip to content
Merged
13 changes: 13 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Resilience and safety:
- `commons/assert`: production-safe assertions with telemetry integration and domain predicates
- `commons/safe`: panic-free math/regex/slice operations with error returns
- `commons/security`: sensitive field detection and handling
- `commons/security/ssrf`: canonical SSRF validation — IP blocking (CIDR blocklist + stdlib predicates), hostname blocking (metadata endpoints, dangerous suffixes), URL validation, DNS-pinned resolution with TOCTOU elimination
- `commons/errgroup`: goroutine coordination with panic recovery
- `commons/certificate`: thread-safe TLS certificate manager with hot reload, PEM file loading, PKCS#8/PKCS#1/EC key support, and `tls.Config` integration

Expand Down Expand Up @@ -137,6 +138,17 @@ Build and shell:
- TLS integration: `TLSCertificate() tls.Certificate` (returns populated `tls.Certificate` struct); `GetCertificateFunc() func(*tls.ClientHelloInfo) (*tls.Certificate, error)` — suitable for assignment to `tls.Config.GetCertificate` for transparent hot-reload.
- Package-level helper: `LoadFromFiles(certPath, keyPath string) (*x509.Certificate, crypto.Signer, error)` — validates without touching any manager state, useful for pre-flight checking before calling `Rotate`.

### SSRF validation (`commons/security/ssrf`)

- `IsBlockedIP(net.IP)` and `IsBlockedAddr(netip.Addr)` for IP-level SSRF blocking. `IsBlockedAddr` is the core; `IsBlockedIP` delegates after conversion.
- `IsBlockedHostname(hostname)` for hostname-level blocking: localhost, cloud metadata endpoints, `.local`/`.internal`/`.cluster.local` suffixes.
- `BlockedPrefixes() []netip.Prefix` returns a copy of the canonical CIDR blocklist (8 ranges: this-network, CGNAT, IETF assignments, TEST-NET-1/2/3, benchmarking, reserved).
- `ValidateURL(ctx, rawURL, opts...)` validates scheme, hostname, and parsed IP without DNS.
- `ResolveAndValidate(ctx, rawURL, opts...) (*ResolveResult, error)` performs DNS-pinned validation. `ResolveResult` has `PinnedURL`, `Authority` (host:port for HTTP Host), `SNIHostname` (for TLS ServerName).
- Functional options: `WithHTTPSOnly()`, `WithAllowPrivateNetwork()`, `WithLookupFunc(fn)`, `WithAllowHostname(hostname)`.
- Sentinel errors: `ErrBlocked`, `ErrInvalidURL`, `ErrDNSFailed`.
- Both `commons/webhook` and `commons/net/http` delegate to this package — it is the single source of truth for SSRF blocking across all Lerian services.

### Assertions (`commons/assert`)

- `New(ctx, logger, component, operation) *Asserter` and return errors instead of panicking.
Expand Down Expand Up @@ -238,6 +250,7 @@ Build and shell:
- **Pointers:** `String()`, `Bool()`, `Time()`, `Int()`, `Int64()`, `Float64()`.
- **Cron:** `Parse(expr) (Schedule, error)`; `Schedule.Next(t) (time.Time, error)`.
- **Security:** `IsSensitiveField(name)`, `DefaultSensitiveFields()`, `DefaultSensitiveFieldsMap()`.
- **SSRF:** `IsBlockedIP()`, `IsBlockedAddr()`, `IsBlockedHostname()`, `BlockedPrefixes()`, `ValidateURL()`, `ResolveAndValidate()`. Single source of truth for all SSRF protection. Both `webhook` and `net/http` delegate here.
- **Transaction:** `BuildIntentPlan()` + `ValidateBalanceEligibility()` + `ApplyPosting()` with typed `IntentPlan`, `Posting`, `LedgerTarget`. `ResolveOperation(pending, isSource, status) (Operation, error)`.
- **Constants:** `SanitizeMetricLabel(value) string` for OTEL label safety.
- **Certificate:** `NewManager(certPath, keyPath) (*Manager, error)`; `Rotate(cert, key)`, `TLSCertificate()`, `GetCertificateFunc()`; package-level `LoadFromFiles(certPath, keyPath)` for pre-flight validation.
Expand Down
43 changes: 35 additions & 8 deletions commons/certificate/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ func (m *Manager) Rotate(cert *x509.Certificate, key crypto.Signer, intermediate
return nil
}

// GetCertificate returns the current certificate, or nil if none is loaded.
// The returned pointer shares state with the manager; callers must treat it
// as read-only. Use [LoadFromFiles] + [Manager.Rotate] to replace it.
// GetCertificate returns a deep copy of the current certificate, or nil if
// none is loaded. The returned *x509.Certificate is an independent clone that
// callers may freely modify without affecting the manager's internal state.
// Use [LoadFromFiles] + [Manager.Rotate] to replace the managed certificate.
func (m *Manager) GetCertificate() *x509.Certificate {
if m == nil {
return nil
Expand All @@ -162,7 +163,7 @@ func (m *Manager) GetCertificate() *x509.Certificate {
m.mu.RLock()
defer m.mu.RUnlock()

return m.cert
return cloneCert(m.cert)
}

// GetSigner returns the current private key as a crypto.Signer, or nil if none is loaded.
Expand Down Expand Up @@ -212,7 +213,10 @@ func (m *Manager) ExpiresAt() time.Time {
}

// DaysUntilExpiry returns the number of days until the certificate expires.
// Returns -1 if no certificate is loaded.
// It returns -1 when no certificate is loaded (nil receiver or no certificate
// configured via [NewManager]). Otherwise it returns the number of days until
// expiry, which may be negative for already-expired certificates (e.g. -3
// means the certificate expired 3 days ago).
func (m *Manager) DaysUntilExpiry() int {
if m == nil {
return -1
Expand All @@ -228,8 +232,9 @@ func (m *Manager) DaysUntilExpiry() int {

// TLSCertificate returns a [tls.Certificate] built from the currently loaded
// certificate chain and private key. Returns an empty [tls.Certificate] if no
// certificate is loaded. The Leaf field shares state with the manager; callers
// should treat it as read-only.
// certificate is loaded. Both the Certificate [][]byte chain and the Leaf are
// deep copies, so callers never receive references aliasing internal state.
// Safe to call on a nil receiver (returns an empty [tls.Certificate]).
func (m *Manager) TLSCertificate() tls.Certificate {
if m == nil {
return tls.Certificate{}
Expand All @@ -252,7 +257,7 @@ func (m *Manager) TLSCertificate() tls.Certificate {
return tls.Certificate{
Certificate: chainCopy,
PrivateKey: m.signer,
Leaf: m.cert,
Leaf: cloneCert(m.cert),
}
}

Expand Down Expand Up @@ -388,3 +393,25 @@ func publicKeysMatch(certPublicKey, signerPublicKey any) bool {

return bytes.Equal(certDER, signerDER)
}

// cloneCert returns a deep copy of cert by re-parsing its DER encoding.
// Returns nil when cert is nil.
func cloneCert(cert *x509.Certificate) *x509.Certificate {
if cert == nil {
return nil
}

raw := make([]byte, len(cert.Raw))
copy(raw, cert.Raw)

// ParseCertificate is the canonical way to deep-copy an x509.Certificate.
// Errors here are unexpected (the DER was already parsed once), but we
// return nil rather than panicking to stay consistent with the package's
// nil-safe contract.
clone, err := x509.ParseCertificate(raw)
if err != nil {
return nil
}

return clone
}
17 changes: 4 additions & 13 deletions commons/certificate/certificate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,20 +860,11 @@ func TestNilManager_SubTests(t *testing.T) {
assert.Equal(t, -1, m.DaysUntilExpiry())
})

t.Run("TLSCertificate returns empty", func(t *testing.T) {
t.Run("TLSCertificate returns empty on nil receiver", func(t *testing.T) {
t.Parallel()
// TLSCertificate checks m == nil after acquiring the lock which it
// can't do on a nil receiver — production code checks m == nil inside
// the method body after the RLock, but since m is nil the lock call
// itself will panic. The method guards with `if m == nil` but the
// guard is AFTER the RLock. Let's verify by calling it safely.
// Actually the method body is: mu.RLock then `if m == nil` — a nil
// pointer dereference would occur. We document that TLSCertificate
// is NOT nil-receiver-safe (unlike the other methods) by asserting
// the returned value from a non-nil empty manager instead.
empty, emptyErr := NewManager("", "")
require.NoError(t, emptyErr)
tlsCert := empty.TLSCertificate()
// TLSCertificate is nil-receiver-safe: the nil guard precedes the
// RLock call, consistent with every other Manager method.
tlsCert := m.TLSCertificate()
assert.Equal(t, tls.Certificate{}, tlsCert)
})
}
8 changes: 6 additions & 2 deletions commons/dlq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ func WithBatchSize(n int) ConsumerOption {
}
}

// WithSources sets the DLQ source queue names to consume.
// WithSources sets the DLQ source queue names to consume. The input slice is
// cloned so that subsequent mutations by the caller do not race with the
// consumer's read loop.
func WithSources(sources ...string) ConsumerOption {
return func(c *Consumer) {
if len(sources) > 0 {
c.cfg.Sources = sources
cloned := make([]string, len(sources))
copy(cloned, sources)
c.cfg.Sources = cloned
}
}
}
Expand Down
112 changes: 73 additions & 39 deletions commons/dlq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,52 +148,18 @@ func (h *Handler) Enqueue(ctx context.Context, msg *FailedMessage) error {
return ErrNilHandler
}

if msg == nil {
return errors.New("dlq: enqueue: nil message")
}

if msg.Source == "" {
return errors.New("dlq: enqueue: source must not be empty")
}

if err := validateKeySegment("source", msg.Source); err != nil {
if err := validateEnqueueMessage(msg); err != nil {
return err
}

ctx, span := h.tracer.Start(ctx, "dlq.enqueue")
defer span.End()

// Only stamp CreatedAt and MaxRetries on initial enqueue (zero-valued).
// Re-enqueue paths (consumer retry-failed, not-yet-ready, prune) pass
// messages that already carry the original values; overwriting them would
// permanently lose the original failure timestamp and retry budget.
initialEnqueue := msg.CreatedAt.IsZero()
if initialEnqueue {
msg.CreatedAt = time.Now().UTC()
}

if msg.MaxRetries == 0 {
msg.MaxRetries = h.maxRetries
}

ctxTenant := tmcore.GetTenantIDContext(ctx)
h.stampInitialEnqueue(msg)

effectiveTenant := msg.TenantID
if effectiveTenant == "" {
effectiveTenant = ctxTenant
msg.TenantID = effectiveTenant
}

if effectiveTenant != "" && ctxTenant != "" && effectiveTenant != ctxTenant {
return fmt.Errorf("dlq: enqueue: tenant mismatch between message (%s) and context (%s)", effectiveTenant, ctxTenant)
}

// Recalculate NextRetryAt only on initial enqueue. On re-enqueue the
// consumer has already incremented RetryCount and the caller is
// responsible for timing; we preserve their NextRetryAt or let the
// backoff be recalculated by the consumer path that sets RetryCount.
if initialEnqueue && msg.RetryCount < msg.MaxRetries {
msg.NextRetryAt = msg.CreatedAt.Add(backoffDuration(msg.RetryCount))
effectiveTenant, err := h.resolveAndValidateTenant(ctx, msg)
if err != nil {
return err
}

data, err := json.Marshal(msg)
Expand Down Expand Up @@ -223,6 +189,74 @@ func (h *Handler) Enqueue(ctx context.Context, msg *FailedMessage) error {
return nil
}

// validateEnqueueMessage performs pre-flight validation on the message before
// any state mutation or tracing begins.
func validateEnqueueMessage(msg *FailedMessage) error {
if msg == nil {
return errors.New("dlq: enqueue: nil message")
}

if msg.Source == "" {
return errors.New("dlq: enqueue: source must not be empty")
}

return validateKeySegment("source", msg.Source)
}

// stampInitialEnqueue sets CreatedAt, MaxRetries, and NextRetryAt on messages
// that are being enqueued for the first time (CreatedAt is zero).
// Re-enqueue paths (consumer retry-failed, not-yet-ready, prune) pass messages
// that already carry the original values; overwriting them would permanently
// lose the original failure timestamp and retry budget.
func (h *Handler) stampInitialEnqueue(msg *FailedMessage) {
initialEnqueue := msg.CreatedAt.IsZero()
if initialEnqueue {
msg.CreatedAt = time.Now().UTC()
}

if msg.MaxRetries <= 0 {
msg.MaxRetries = h.maxRetries
}

// Recalculate NextRetryAt only on initial enqueue. On re-enqueue the
// consumer has already incremented RetryCount and the caller is
// responsible for timing; we preserve their NextRetryAt or let the
// backoff be recalculated by the consumer path that sets RetryCount.
if initialEnqueue && msg.RetryCount < msg.MaxRetries {
msg.NextRetryAt = msg.CreatedAt.Add(backoffDuration(msg.RetryCount))
}
}

// resolveAndValidateTenant determines the effective tenant ID for the message by
// reconciling the message's TenantID with the tenant from context. It validates
// that they match when both are present, and validates the tenant as a safe Redis
// key segment. Returns the effective tenant ID.
func (h *Handler) resolveAndValidateTenant(ctx context.Context, msg *FailedMessage) (string, error) {
ctxTenant := tmcore.GetTenantIDContext(ctx)

effectiveTenant := msg.TenantID
if effectiveTenant == "" {
effectiveTenant = ctxTenant
msg.TenantID = effectiveTenant
}

if effectiveTenant != "" && ctxTenant != "" && effectiveTenant != ctxTenant {
return "", fmt.Errorf("dlq: enqueue: tenant mismatch between message (%s) and context (%s)", effectiveTenant, ctxTenant)
}

// Validate the effective tenant before using it to construct a Redis key.
// This prevents invalid tenant IDs from silently falling back to the global
// (non-tenant) key inside tenantScopedKeyForTenant, which would mix
// tenant-scoped messages into the global queue.
if effectiveTenant != "" {
if err := validateKeySegment("tenantID", effectiveTenant); err != nil {
return "", fmt.Errorf("dlq: enqueue: %w", err)
}
}

return effectiveTenant, nil
}

// logEnqueueFallback logs message metadata when Redis is unreachable. The
// payload is redacted to prevent PII leakage into log aggregators.
func (h *Handler) logEnqueueFallback(ctx context.Context, key string, msg *FailedMessage, err error) {
Expand Down
76 changes: 49 additions & 27 deletions commons/net/http/idempotency/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,67 @@
// requests may execute more than once. Callers that require strict at-most-once
// guarantees must pair this middleware with application-level safeguards.
//
// The middleware uses the X-Idempotency request header combined with the tenant ID
// (from tenant-manager context) to form a composite Redis key. When a tenant ID is
// present, keys are scoped per-tenant to prevent cross-tenant collisions. When no
// tenant is in context (e.g., non-tenant-scoped routes), keys are still valid but
// are shared across the global namespace. Duplicate requests receive the original
// response with the X-Idempotency-Replayed header set to "true".
// # Key composition
//
// # Quick start
//
// conn, err := redis.New(ctx, cfg)
// if err != nil {
// log.Fatal(err)
// }
// idem := idempotency.New(conn)
// app.Post("/orders", idem.Check(), createOrderHandler)
//
// # Behavior
// The middleware uses the X-Idempotency request header ([constants.IdempotencyKey])
// combined with the tenant ID (from tenant-manager context via
// [tmcore.GetTenantIDContext]) to form a composite Redis key:
//
// - GET/OPTIONS requests pass through (idempotency is irrelevant for reads).
// - Absent header: request proceeds normally (idempotency is opt-in).
// - Header exceeds MaxKeyLength (default 256): request rejected with 400.
// - Duplicate key: cached response replayed with X-Idempotency-Replayed: true.
// - Redis failure: request proceeds (fail-open for availability).
// - Handler success: response cached; handler failure: key deleted (client can retry).
// <prefix><tenantID>:<idempotencyKey>
//
// # Redis key namespace convention
// When a tenant ID is present, keys are scoped per-tenant to prevent
// cross-tenant collisions. When no tenant is in context (e.g., non-tenant-scoped
// routes), the key becomes <prefix>:<idempotencyKey> in the global namespace.
//
// Keys follow the pattern: <prefix><tenantID>:<idempotencyKey>
// with a companion response key at <prefix><tenantID>:<idempotencyKey>:response.
// A companion response key at <prefix><tenantID>:<idempotencyKey>:response stores
// the cached response body and headers for replay.
//
// The default prefix is "idempotency:" and can be overridden via WithKeyPrefix.
// The default prefix is "idempotency:" and can be overridden via [WithKeyPrefix].
// This namespacing convention is consistent with other lib-commons packages that
// use Redis (e.g., rate limiting uses "ratelimit:<tenantID>:..."). Per-tenant
// isolation is enforced by embedding the tenant ID into the key rather than
// using separate Redis databases or key-space notifications, which keeps the
// implementation topology-agnostic (standalone, sentinel, and cluster all behave
// identically with this approach).
//
// # Quick start
//
// conn, err := redis.New(ctx, cfg)
// if err != nil {
// return err
// }
// idem := idempotency.New(conn)
// app.Post("/orders", idem.Check(), createOrderHandler)
//
// # Behavior branches
//
// The [Middleware.Check] handler evaluates requests through the following
// branches in order:
//
// - GET, HEAD, and OPTIONS requests pass through unconditionally — idempotency
// is not enforced for safe/idempotent HTTP methods.
// - Absent X-Idempotency header: request proceeds normally (idempotency is
// opt-in per request).
// - Header exceeds [WithMaxKeyLength] (default 256): request is passed to the
// configured [WithRejectedHandler]. When no custom handler is set, a 400 JSON
// response with code "VALIDATION_ERROR" is returned.
// - Redis unavailable (GetClient, SetNX, or Get failures): request proceeds
// without idempotency enforcement (fail-open), logged at WARN level.
// - Duplicate key with cached response: the original response is replayed
// faithfully — status code, headers (including Location, ETag, Set-Cookie),
// content type, and body — with [constants.IdempotencyReplayed] set to "true".
// - Duplicate key still in "processing" state (in-flight): 409 Conflict with
// code "IDEMPOTENCY_CONFLICT" is returned.
// - Duplicate key in "complete" state but no cached body (e.g., body exceeded
// [WithMaxBodyCache]): 200 OK with code "IDEMPOTENT" and detail "request
// already processed" is returned.
// - Handler success: response (status, headers, body) is cached via a Redis
// pipeline and the key is marked "complete".
// - Handler failure: both the lock key and response key are deleted so the
// client can retry with the same idempotency key.
//
// # Nil safety
//
// A nil *Middleware returns a pass-through handler from Check().
// [New] returns nil when conn is nil. A nil [*Middleware] returns a pass-through
// handler from [Middleware.Check].
package idempotency
Loading
Loading