Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 68 additions & 4 deletions docs/migrating.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,41 @@ Application:
uri: file:///etc/avro/{0}.avsc
```

### Connection changes

The `ssl` field is now a top-level boolean on connection entries, separate
from `ssl_options`:

```yaml
# 3.x (ssl was implied by the presence of ssl_options)
Connections:
rabbitmq:
host: rabbitmq.example.com
ssl_options:
ca_certs: /etc/ssl/certs/ca-bundle.crt

# 4.0 (explicit ssl toggle + ssl_options)
Connections:
rabbitmq:
host: rabbitmq.example.com
ssl: true
ssl_options:
ca_certs: /etc/ssl/certs/ca-bundle.crt
```

New optional connection fields have been added:

| Field | Description | Default |
|-------|-------------|---------|
| `frame_max` | Maximum AMQP frame size in bytes | `131072` |
| `socket_timeout` | Socket timeout in seconds | `10` |

The default SSL protocol has changed from `PROTOCOL_TLS` to
`PROTOCOL_TLS_CLIENT`, which enables certificate verification by default.
If you were relying on the old default without explicit certificate
validation, you may need to provide `ca_certs` or `ca_path` in your
`ssl_options`.

### Consumer connections

The structured connection `publisher_confirmation` field has been renamed to
Expand All @@ -303,15 +338,41 @@ connections:
confirm: true
```

### Consumer configuration changes

The `influxdb_measurement` consumer field has been removed along with
InfluxDB support. Use Prometheus metrics instead.

### Logging changes

The `debug_only` handler option has been removed. In 3.x, this was used to
suppress console handlers when running as a daemon. Since 4.0 always runs
in the foreground, this option is no longer needed. Remove `debug_only` from
any handler definitions in your logging configuration.

### Removed configuration

| Removed | Notes |
|---------|-------|
| `Daemon` section | Rejected no longer daemonizes; use systemd/supervisord |
| `stats.influxdb` | InfluxDB support removed; use Prometheus |
| `influxdb_measurement` | Consumer-level InfluxDB measurement name removed |
| `dynamic_qos` | QoS is now static via `qos_prefetch` |
| `debug_only` (logging handler) | No longer needed without daemon mode |
| `-f` / `--foreground` CLI flag | Rejected always runs in the foreground |

### New CLI options

| Flag | Description |
|------|-------------|
| `-n` / `--max-messages N` | Process N messages per consumer then shut down |

### SIGHUP reload

4.0 supports configuration reloading via `SIGHUP`. When the process receives
`SIGHUP`, it reloads the configuration file and restarts consumer processes
with the updated settings — no full process restart required.

## Removed APIs

| 3.x API | 4.0 Replacement |
Expand Down Expand Up @@ -362,7 +423,10 @@ to Prometheus.
`rejected.measurement.Measurement`, etc.
9. Update tests to use `async def` test methods (no `@gen_test`)
10. Rename `publisher_confirmation` to `confirm` in connection config
11. Remove `Daemon` section from config (if present)
12. Remove `stats.influxdb` from config (use Prometheus instead)
13. Remove any `pickle` content type usage
14. Replace deprecated `statsd_*` method calls with `stats_*` equivalents
11. Add `ssl: true` to connections that use `ssl_options`
12. Remove `Daemon` section from config (if present)
13. Remove `stats.influxdb` from config (use Prometheus instead)
14. Remove `influxdb_measurement` from consumer configs
15. Remove `debug_only` from logging handler configs
16. Remove any `pickle` content type usage
17. Replace deprecated `statsd_*` method calls with `stats_*` equivalents
23 changes: 21 additions & 2 deletions rejected/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ def create_context(
"""Create a :class:`~rejected.models.ProcessingContext` for
testing.

If ``message_body`` is a dict and ``content_type`` is
``application/json``, the body is JSON-serialized.
The body is stored as raw bytes on the message, matching what
RabbitMQ delivers. Non-bytes/str bodies are serialized via the
:class:`~rejected.codecs.Codec`. Use :meth:`process_message`
to also run the codec decode step before the consumer sees the
message (matching production behavior).

"""
properties = properties or {}
Expand All @@ -143,6 +146,8 @@ def create_context(
and properties.get('content_type') == 'application/json'
):
message_body = json.dumps(message_body)
if isinstance(message_body, str):
message_body = message_body.encode('utf-8')

mock_conn = mock.Mock(spec=connection.Connection)
mock_conn.is_running = True
Expand Down Expand Up @@ -222,6 +227,20 @@ async def process_message(
)
self._last_ctx = ctx

# Decode body through the codec, matching process.py behavior
if self.process.codec and ctx.message.body is not None:
msg = ctx.message
ctx.raw_body = msg.body if isinstance(msg.body, bytes) else b''
try:
msg.body = await self.process.codec.decode(
msg.body, msg.content_type, msg.content_encoding, msg.type
)
except codecs.DecodeError as err:
ctx.result = models.Result.MESSAGE_EXCEPTION
raise exceptions.MessageException(
'Failed to decode message body'
) from err

# Patch _log_exception to capture exc_info for re-raising
original_log = self.consumer._log_exception

Expand Down
Loading