feat: DLQ support, error classification, and retry tracking#7
Merged
Conversation
…c recovery Add robust error handling to the consumer with three error categories: - PermanentError: routes immediately to DLQ (e.g. malformed payloads) - TransientError: retries with exponential backoff - Unclassified: retries up to MaxRetries, then routes to DLQ Key changes: - New ConsumerConfig struct and NewConsumerWithOptions constructor - DLQ infrastructure (dead-letter exchange + queue per consumer) - Retry tracking via x-retry-count AMQP header - Panic recovery in handler (panics become PermanentError) - Fix ACK/NACK multiple=true bug (was true, now false to avoid acknowledging unprocessed messages with prefetch) - Fully backward compatible: NewConsumer/NewConsumerWithConfig unchanged 12 new tests, all passing with -race. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PermanentError,TransientError) withIsPermanent()/IsTransient()helpers for consumer error routing{exchange}.dlx) + DLQ queue ({queue}.dlq) auto-declared on consumer setupConsumerConfigstruct andNewConsumerWithOptions()constructor for full control over retry/DLQ/prefetch behaviourx-retry-countAMQP header with exponential backoff (capped at 30s)PermanentErrorand route to DLQAck(true)/Nack(true, ...)multiple=true bug that could ACK/NACK unprocessed messages when prefetch > 1Error routing logic
PermanentErrorTransientError/ unclassified (retries < max)x-retry-count, exponential backoff, ACK originalPermanentError→ DLQBackward compatibility
NewConsumer()andNewConsumerWithConfig()unchanged — DLQ disabled by defaultNack(false, true)(requeue) on error, same as before but with the multiple=false fixTest plan
-race🤖 Generated with Claude Code