Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ Each `Process` runs an asyncio event loop with one or more pika `AsyncioConnecti
2. `Process.on_message` builds a `ProcessingContext` (Pydantic model) and schedules `invoke_consumer`
3. `invoke_consumer` decodes the body via `Codec`, then calls `consumer.execute(ctx)`
4. `_Consumer.execute` runs pre-validation (message type, retry limits), then delegates to `_run_consumer`
5. `Consumer._run_consumer` acquires a lock and calls `prepare()` → `process()` → `finish()`; `TransactionConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument
5. `Consumer._run_consumer` acquires a lock and calls `prepare()` → `process()` → `finish()`; `FunctionalConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument
6. `Process.on_processed` handles the result: ack, nack, requeue, or republish based on the `Result` enum

### Key Module Responsibilities

- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `TransactionConsumer` (concurrent, ctx-style)
- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `FunctionalConsumer` (concurrent, ctx-style)
- **codecs.py**: `Codec` class handles encode/decode dispatch by content_type/content_encoding, plus async Avro schema loading
- **connection.py**: Wraps pika `AsyncioConnection`, manages channel lifecycle, QoS, consumer tags
- **process.py**: `Process(multiprocessing.Process)` — the per-consumer child process with asyncio event loop
Expand Down
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ each run in an isolated process. It has the ability to collect statistical
data from the consumer processes and report on it.

[![Version](https://img.shields.io/pypi/v/rejected.svg?)](https://pypi.python.org/pypi/rejected)
[![Python](https://img.shields.io/pypi/pyversions/rejected.svg)](https://pypi.python.org/pypi/rejected)
[![License](https://img.shields.io/pypi/l/rejected.svg?)](https://github.com/gmr/rejected/blob/main/LICENSE)

## Features

- Async consumers built on `asyncio`
- Automatic exception handling including connection management and consumer restarting
- Smart consumer classes that automatically decode and deserialize message bodies based on message headers
- Concurrent message processing with `TransactionConsumer`
- Concurrent message processing with `FunctionalConsumer`
- Metrics via statsd and/or Prometheus
- Built-in profiling of consumer code
- Avro schema support with file and HTTP schema registries
Expand All @@ -42,34 +41,36 @@ pip install rejected[sentry] # Sentry error reporting

## Documentation

Full documentation is available at [https://rejected.readthedocs.io](https://rejected.readthedocs.io).
Full documentation is available at [https://gmr.github.io/rejected](https://gmr.github.io/rejected).

## Example Consumer

```python
from rejected import consumer
import logging

import rejected

LOGGER = logging.getLogger(__name__)


class Test(consumer.Consumer):
class Test(rejected.Consumer):

async def process(self) -> None:
LOGGER.debug('In Test.process: %s', self.body)
```

For concurrent message processing, use `TransactionConsumer`:
For concurrent message processing, use `FunctionalConsumer`:

```python
from rejected import consumer, models
import logging

import rejected

LOGGER = logging.getLogger(__name__)


class Test(consumer.TransactionConsumer):
class Test(rejected.FunctionalConsumer):

async def process(self, ctx: models.ProcessingContext) -> None:
async def process(self, ctx: rejected.ProcessingContext) -> None:
LOGGER.debug('Processing: %s', ctx.message.body)
```
4 changes: 2 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ The primary base classes for building message consumers.
- timestamp
- user_id

### TransactionConsumer
### FunctionalConsumer

::: rejected.consumer.TransactionConsumer
::: rejected.consumer.FunctionalConsumer
options:
members:
- process
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Each consumer entry is a named object with the following attributes:
| `qty` | Number of consumer processes to run | `1` |
| `queue` | RabbitMQ queue name to consume from (defaults to consumer name) | |
| `ack` | Explicitly acknowledge messages (`no_ack = !ack`) | `true` |
| `qos_prefetch` | QoS prefetch count (set > 1 for concurrent processing with `TransactionConsumer`) | `1` |
| `qos_prefetch` | QoS prefetch count (set > 1 for concurrent processing with `FunctionalConsumer`) | `1` |
| `max_errors` | Errors within 60s before restarting the consumer | `5` |
| `error_exchange` | Exchange to republish messages to on `ProcessingException` | |
| `error_max_retry` | Max `ProcessingException` retries before dropping | |
Expand Down
41 changes: 23 additions & 18 deletions docs/consumer_howto.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ The following example illustrates a very simple consumer that logs each
message body as it's received.

```python
from rejected import consumer
import logging

import rejected

__version__ = '1.0.0'

LOGGER = logging.getLogger(__name__)


class ExampleConsumer(consumer.Consumer):
class ExampleConsumer(rejected.Consumer):

async def process(self):
LOGGER.info(self.body)
Expand Down Expand Up @@ -51,22 +52,23 @@ error counter. When too many errors occur, rejected will automatically restart
the consumer after a brief quiet period.

```python
from rejected import consumer, exceptions
import logging

import rejected

__version__ = '1.0.0'

LOGGER = logging.getLogger(__name__)


class ExampleConsumer(consumer.Consumer):
class ExampleConsumer(rejected.Consumer):

def _connect_to_database(self):
return False

async def process(self):
if not self._connect_to_database():
raise exceptions.ConsumerException('Database error')
raise rejected.ConsumerException('Database error')
LOGGER.info(self.body)
```

Expand Down Expand Up @@ -128,7 +130,7 @@ If the type does not match:
- Otherwise, a `MessageException` is raised.

```python
class StrictConsumer(consumer.Consumer):
class StrictConsumer(rejected.Consumer):
MESSAGE_TYPE = 'user.created'
DROP_INVALID_MESSAGES = True
DROP_EXCHANGE = 'dead-letter'
Expand All @@ -143,7 +145,7 @@ Consumers can publish messages using `publish_message`. Note that it is
an `async` method:

```python
class ExampleConsumer(consumer.Consumer):
class ExampleConsumer(rejected.Consumer):

async def process(self):
LOGGER.info(self.body)
Expand All @@ -160,29 +162,30 @@ The `properties` parameter is a dict of AMQP properties (e.g., `content_type`,
serialized and compressed based on the `content_type` and `content_encoding`
properties.

## TransactionConsumer
## FunctionalConsumer

`TransactionConsumer` is designed for concurrent message processing. Unlike
`FunctionalConsumer` is designed for concurrent message processing. Unlike
`Consumer`, it does not hold a lock -- multiple messages may be processed
in parallel. Instead of accessing message properties via `self.body` etc.,
the processing context is passed explicitly:

```python
from rejected import consumer, models
import logging

import rejected

LOGGER = logging.getLogger(__name__)


class MyConcurrentConsumer(consumer.TransactionConsumer):
class MyConcurrentConsumer(rejected.FunctionalConsumer):

async def prepare(self, ctx: models.ProcessingContext):
async def prepare(self, ctx: rejected.ProcessingContext):
LOGGER.debug('Preparing to process %s', ctx.message.message_id)

async def process(self, ctx: models.ProcessingContext):
async def process(self, ctx: rejected.ProcessingContext):
LOGGER.info('Processing: %s', ctx.message.body)

async def finish(self, ctx: models.ProcessingContext):
async def finish(self, ctx: rejected.ProcessingContext):
LOGGER.debug('Finished processing %s', ctx.message.message_id)
```

Expand All @@ -196,7 +199,7 @@ The `ProcessingContext` provides:
- `ctx.result` -- the `Result` enum indicating message disposition
- `ctx.received_at` -- monotonic timestamp when the message was received

Use `TransactionConsumer` with `qos_prefetch > 1` to process multiple
Use `FunctionalConsumer` with `qos_prefetch > 1` to process multiple
messages concurrently.

## Custom Metrics
Expand All @@ -205,7 +208,7 @@ Consumers can emit custom metrics that are forwarded to statsd and/or
Prometheus:

```python
class MetricsConsumer(consumer.Consumer):
class MetricsConsumer(rejected.Consumer):

async def process(self):
# Increment a counter
Expand All @@ -229,10 +232,12 @@ after processing messages. By default it collects every 10,000 messages.
Configure the frequency via the `gc_collection_frequency` setting:

```python
from rejected import consumer, mixins
from rejected.mixins import GarbageCollectorMixin

import rejected


class MyConsumer(mixins.GarbageCollectorMixin, consumer.Consumer):
class MyConsumer(GarbageCollectorMixin, rejected.Consumer):

async def process(self):
...
Expand Down
7 changes: 4 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ data from the consumer processes and report on it.
- Async consumers built on `asyncio`
- Automatic exception handling including connection management and consumer restarting
- Smart consumer classes that automatically decode and deserialize message bodies based on message headers
- Concurrent message processing with `TransactionConsumer`
- Concurrent message processing with `FunctionalConsumer`
- Metrics via statsd and/or Prometheus
- Built-in profiling of consumer code
- Avro schema support with file and HTTP schema registries
Expand All @@ -39,13 +39,14 @@ pip install rejected[sentry] # Sentry error reporting
## Quick Start

```python
from rejected import consumer
import logging

import rejected

LOGGER = logging.getLogger(__name__)


class ExampleConsumer(consumer.Consumer):
class ExampleConsumer(rejected.Consumer):

async def process(self):
LOGGER.info(self.body)
Expand Down
Loading
Loading