From 7333431343a44343159cf15a3141343b5e65b77d Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Sat, 4 Apr 2026 14:46:22 -0400 Subject: [PATCH] Add missing config sections to migration guide and fix testing codec The migration guide was missing several configuration changes between 3.x and 4.0: connection ssl/frame_max/socket_timeout fields, SSL protocol default change, influxdb_measurement removal, debug_only logging handler removal, new --max-messages CLI flag, and SIGHUP reload support. AsyncTestCase.process_message was not running the codec decode step before calling consumer.execute, so consumers under test received raw JSON strings instead of decoded dicts. Now matches the process.py message flow by running codec.decode and setting raw_body. Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/migrating.md | 72 ++++++++++++++++++++++++++++++++++++++++++--- rejected/testing.py | 23 +++++++++++++-- 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/docs/migrating.md b/docs/migrating.md index df901fa..9874934 100644 --- a/docs/migrating.md +++ b/docs/migrating.md @@ -284,6 +284,41 @@ Application: uri: file:///etc/avro/{0}.avsc ``` +### Connection changes + +The `ssl` field is now a top-level boolean on connection entries, separate +from `ssl_options`: + +```yaml +# 3.x (ssl was implied by the presence of ssl_options) +Connections: + rabbitmq: + host: rabbitmq.example.com + ssl_options: + ca_certs: /etc/ssl/certs/ca-bundle.crt + +# 4.0 (explicit ssl toggle + ssl_options) +Connections: + rabbitmq: + host: rabbitmq.example.com + ssl: true + ssl_options: + ca_certs: /etc/ssl/certs/ca-bundle.crt +``` + +New optional connection fields have been added: + +| Field | Description | Default | +|-------|-------------|---------| +| `frame_max` | Maximum AMQP frame size in bytes | `131072` | +| `socket_timeout` | Socket timeout in seconds | `10` | + +The default SSL protocol has changed from `PROTOCOL_TLS` to +`PROTOCOL_TLS_CLIENT`, which enables certificate verification by default. +If you were relying on the old default without explicit certificate +validation, you may need to provide `ca_certs` or `ca_path` in your +`ssl_options`. + ### Consumer connections The structured connection `publisher_confirmation` field has been renamed to @@ -303,15 +338,41 @@ connections: confirm: true ``` +### Consumer configuration changes + +The `influxdb_measurement` consumer field has been removed along with +InfluxDB support. Use Prometheus metrics instead. + +### Logging changes + +The `debug_only` handler option has been removed. In 3.x, this was used to +suppress console handlers when running as a daemon. Since 4.0 always runs +in the foreground, this option is no longer needed. Remove `debug_only` from +any handler definitions in your logging configuration. + ### Removed configuration | Removed | Notes | |---------|-------| | `Daemon` section | Rejected no longer daemonizes; use systemd/supervisord | | `stats.influxdb` | InfluxDB support removed; use Prometheus | +| `influxdb_measurement` | Consumer-level InfluxDB measurement name removed | | `dynamic_qos` | QoS is now static via `qos_prefetch` | +| `debug_only` (logging handler) | No longer needed without daemon mode | | `-f` / `--foreground` CLI flag | Rejected always runs in the foreground | +### New CLI options + +| Flag | Description | +|------|-------------| +| `-n` / `--max-messages N` | Process N messages per consumer then shut down | + +### SIGHUP reload + +4.0 supports configuration reloading via `SIGHUP`. When the process receives +`SIGHUP`, it reloads the configuration file and restarts consumer processes +with the updated settings — no full process restart required. + ## Removed APIs | 3.x API | 4.0 Replacement | @@ -362,7 +423,10 @@ to Prometheus. `rejected.measurement.Measurement`, etc. 9. Update tests to use `async def` test methods (no `@gen_test`) 10. Rename `publisher_confirmation` to `confirm` in connection config -11. Remove `Daemon` section from config (if present) -12. Remove `stats.influxdb` from config (use Prometheus instead) -13. Remove any `pickle` content type usage -14. Replace deprecated `statsd_*` method calls with `stats_*` equivalents +11. Add `ssl: true` to connections that use `ssl_options` +12. Remove `Daemon` section from config (if present) +13. Remove `stats.influxdb` from config (use Prometheus instead) +14. Remove `influxdb_measurement` from consumer configs +15. Remove `debug_only` from logging handler configs +16. Remove any `pickle` content type usage +17. Replace deprecated `statsd_*` method calls with `stats_*` equivalents diff --git a/rejected/testing.py b/rejected/testing.py index 1c6fd33..7f51861 100644 --- a/rejected/testing.py +++ b/rejected/testing.py @@ -125,8 +125,11 @@ def create_context( """Create a :class:`~rejected.models.ProcessingContext` for testing. - If ``message_body`` is a dict and ``content_type`` is - ``application/json``, the body is JSON-serialized. + The body is stored as raw bytes on the message, matching what + RabbitMQ delivers. Non-bytes/str bodies are serialized via the + :class:`~rejected.codecs.Codec`. Use :meth:`process_message` + to also run the codec decode step before the consumer sees the + message (matching production behavior). """ properties = properties or {} @@ -143,6 +146,8 @@ def create_context( and properties.get('content_type') == 'application/json' ): message_body = json.dumps(message_body) + if isinstance(message_body, str): + message_body = message_body.encode('utf-8') mock_conn = mock.Mock(spec=connection.Connection) mock_conn.is_running = True @@ -222,6 +227,20 @@ async def process_message( ) self._last_ctx = ctx + # Decode body through the codec, matching process.py behavior + if self.process.codec and ctx.message.body is not None: + msg = ctx.message + ctx.raw_body = msg.body if isinstance(msg.body, bytes) else b'' + try: + msg.body = await self.process.codec.decode( + msg.body, msg.content_type, msg.content_encoding, msg.type + ) + except codecs.DecodeError as err: + ctx.result = models.Result.MESSAGE_EXCEPTION + raise exceptions.MessageException( + 'Failed to decode message body' + ) from err + # Patch _log_exception to capture exc_info for re-raising original_log = self.consumer._log_exception