diff --git a/CLAUDE.md b/CLAUDE.md index 0a7c32f..c8f5d98 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -53,12 +53,12 @@ Each `Process` runs an asyncio event loop with one or more pika `AsyncioConnecti 2. `Process.on_message` builds a `ProcessingContext` (Pydantic model) and schedules `invoke_consumer` 3. `invoke_consumer` decodes the body via `Codec`, then calls `consumer.execute(ctx)` 4. `_Consumer.execute` runs pre-validation (message type, retry limits), then delegates to `_run_consumer` -5. `Consumer._run_consumer` acquires a lock and calls `prepare()` → `process()` → `finish()`; `TransactionConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument +5. `Consumer._run_consumer` acquires a lock and calls `prepare()` → `process()` → `finish()`; `FunctionalConsumer._run_consumer` calls them without a lock, passing `ctx` as an argument 6. `Process.on_processed` handles the result: ack, nack, requeue, or republish based on the `Result` enum ### Key Module Responsibilities -- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `TransactionConsumer` (concurrent, ctx-style) +- **consumer.py**: `_Consumer` base, `Consumer` (locked, self.body-style), `FunctionalConsumer` (concurrent, ctx-style) - **codecs.py**: `Codec` class handles encode/decode dispatch by content_type/content_encoding, plus async Avro schema loading - **connection.py**: Wraps pika `AsyncioConnection`, manages channel lifecycle, QoS, consumer tags - **process.py**: `Process(multiprocessing.Process)` — the per-consumer child process with asyncio event loop diff --git a/README.md b/README.md index aae4675..ab4728e 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,6 @@ each run in an isolated process. It has the ability to collect statistical data from the consumer processes and report on it. [![Version](https://img.shields.io/pypi/v/rejected.svg?)](https://pypi.python.org/pypi/rejected) -[![Python](https://img.shields.io/pypi/pyversions/rejected.svg)](https://pypi.python.org/pypi/rejected) [![License](https://img.shields.io/pypi/l/rejected.svg?)](https://github.com/gmr/rejected/blob/main/LICENSE) ## Features @@ -18,7 +17,7 @@ data from the consumer processes and report on it. - Async consumers built on `asyncio` - Automatic exception handling including connection management and consumer restarting - Smart consumer classes that automatically decode and deserialize message bodies based on message headers -- Concurrent message processing with `TransactionConsumer` +- Concurrent message processing with `FunctionalConsumer` - Metrics via statsd and/or Prometheus - Built-in profiling of consumer code - Avro schema support with file and HTTP schema registries @@ -42,34 +41,36 @@ pip install rejected[sentry] # Sentry error reporting ## Documentation -Full documentation is available at [https://rejected.readthedocs.io](https://rejected.readthedocs.io). +Full documentation is available at [https://gmr.github.io/rejected](https://gmr.github.io/rejected). ## Example Consumer ```python -from rejected import consumer import logging +import rejected + LOGGER = logging.getLogger(__name__) -class Test(consumer.Consumer): +class Test(rejected.Consumer): async def process(self) -> None: LOGGER.debug('In Test.process: %s', self.body) ``` -For concurrent message processing, use `TransactionConsumer`: +For concurrent message processing, use `FunctionalConsumer`: ```python -from rejected import consumer, models import logging +import rejected + LOGGER = logging.getLogger(__name__) -class Test(consumer.TransactionConsumer): +class Test(rejected.FunctionalConsumer): - async def process(self, ctx: models.ProcessingContext) -> None: + async def process(self, ctx: rejected.ProcessingContext) -> None: LOGGER.debug('Processing: %s', ctx.message.body) ``` diff --git a/docs/api.md b/docs/api.md index 6c3e828..53062b8 100644 --- a/docs/api.md +++ b/docs/api.md @@ -49,9 +49,9 @@ The primary base classes for building message consumers. - timestamp - user_id -### TransactionConsumer +### FunctionalConsumer -::: rejected.consumer.TransactionConsumer +::: rejected.consumer.FunctionalConsumer options: members: - process diff --git a/docs/configuration.md b/docs/configuration.md index 1655584..0dde437 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -122,7 +122,7 @@ Each consumer entry is a named object with the following attributes: | `qty` | Number of consumer processes to run | `1` | | `queue` | RabbitMQ queue name to consume from (defaults to consumer name) | | | `ack` | Explicitly acknowledge messages (`no_ack = !ack`) | `true` | -| `qos_prefetch` | QoS prefetch count (set > 1 for concurrent processing with `TransactionConsumer`) | `1` | +| `qos_prefetch` | QoS prefetch count (set > 1 for concurrent processing with `FunctionalConsumer`) | `1` | | `max_errors` | Errors within 60s before restarting the consumer | `5` | | `error_exchange` | Exchange to republish messages to on `ProcessingException` | | | `error_max_retry` | Max `ProcessingException` retries before dropping | | diff --git a/docs/consumer_howto.md b/docs/consumer_howto.md index 1256c0f..a817d9f 100644 --- a/docs/consumer_howto.md +++ b/docs/consumer_howto.md @@ -6,15 +6,16 @@ The following example illustrates a very simple consumer that logs each message body as it's received. ```python -from rejected import consumer import logging +import rejected + __version__ = '1.0.0' LOGGER = logging.getLogger(__name__) -class ExampleConsumer(consumer.Consumer): +class ExampleConsumer(rejected.Consumer): async def process(self): LOGGER.info(self.body) @@ -51,22 +52,23 @@ error counter. When too many errors occur, rejected will automatically restart the consumer after a brief quiet period. ```python -from rejected import consumer, exceptions import logging +import rejected + __version__ = '1.0.0' LOGGER = logging.getLogger(__name__) -class ExampleConsumer(consumer.Consumer): +class ExampleConsumer(rejected.Consumer): def _connect_to_database(self): return False async def process(self): if not self._connect_to_database(): - raise exceptions.ConsumerException('Database error') + raise rejected.ConsumerException('Database error') LOGGER.info(self.body) ``` @@ -128,7 +130,7 @@ If the type does not match: - Otherwise, a `MessageException` is raised. ```python -class StrictConsumer(consumer.Consumer): +class StrictConsumer(rejected.Consumer): MESSAGE_TYPE = 'user.created' DROP_INVALID_MESSAGES = True DROP_EXCHANGE = 'dead-letter' @@ -143,7 +145,7 @@ Consumers can publish messages using `publish_message`. Note that it is an `async` method: ```python -class ExampleConsumer(consumer.Consumer): +class ExampleConsumer(rejected.Consumer): async def process(self): LOGGER.info(self.body) @@ -160,29 +162,30 @@ The `properties` parameter is a dict of AMQP properties (e.g., `content_type`, serialized and compressed based on the `content_type` and `content_encoding` properties. -## TransactionConsumer +## FunctionalConsumer -`TransactionConsumer` is designed for concurrent message processing. Unlike +`FunctionalConsumer` is designed for concurrent message processing. Unlike `Consumer`, it does not hold a lock -- multiple messages may be processed in parallel. Instead of accessing message properties via `self.body` etc., the processing context is passed explicitly: ```python -from rejected import consumer, models import logging +import rejected + LOGGER = logging.getLogger(__name__) -class MyConcurrentConsumer(consumer.TransactionConsumer): +class MyConcurrentConsumer(rejected.FunctionalConsumer): - async def prepare(self, ctx: models.ProcessingContext): + async def prepare(self, ctx: rejected.ProcessingContext): LOGGER.debug('Preparing to process %s', ctx.message.message_id) - async def process(self, ctx: models.ProcessingContext): + async def process(self, ctx: rejected.ProcessingContext): LOGGER.info('Processing: %s', ctx.message.body) - async def finish(self, ctx: models.ProcessingContext): + async def finish(self, ctx: rejected.ProcessingContext): LOGGER.debug('Finished processing %s', ctx.message.message_id) ``` @@ -196,7 +199,7 @@ The `ProcessingContext` provides: - `ctx.result` -- the `Result` enum indicating message disposition - `ctx.received_at` -- monotonic timestamp when the message was received -Use `TransactionConsumer` with `qos_prefetch > 1` to process multiple +Use `FunctionalConsumer` with `qos_prefetch > 1` to process multiple messages concurrently. ## Custom Metrics @@ -205,7 +208,7 @@ Consumers can emit custom metrics that are forwarded to statsd and/or Prometheus: ```python -class MetricsConsumer(consumer.Consumer): +class MetricsConsumer(rejected.Consumer): async def process(self): # Increment a counter @@ -229,10 +232,12 @@ after processing messages. By default it collects every 10,000 messages. Configure the frequency via the `gc_collection_frequency` setting: ```python -from rejected import consumer, mixins +from rejected.mixins import GarbageCollectorMixin + +import rejected -class MyConsumer(mixins.GarbageCollectorMixin, consumer.Consumer): +class MyConsumer(GarbageCollectorMixin, rejected.Consumer): async def process(self): ... diff --git a/docs/index.md b/docs/index.md index 0f574f5..cb73920 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,7 +14,7 @@ data from the consumer processes and report on it. - Async consumers built on `asyncio` - Automatic exception handling including connection management and consumer restarting - Smart consumer classes that automatically decode and deserialize message bodies based on message headers -- Concurrent message processing with `TransactionConsumer` +- Concurrent message processing with `FunctionalConsumer` - Metrics via statsd and/or Prometheus - Built-in profiling of consumer code - Avro schema support with file and HTTP schema registries @@ -39,13 +39,14 @@ pip install rejected[sentry] # Sentry error reporting ## Quick Start ```python -from rejected import consumer import logging +import rejected + LOGGER = logging.getLogger(__name__) -class ExampleConsumer(consumer.Consumer): +class ExampleConsumer(rejected.Consumer): async def process(self): LOGGER.info(self.body) diff --git a/docs/migrating.md b/docs/migrating.md index aa513e9..df901fa 100644 --- a/docs/migrating.md +++ b/docs/migrating.md @@ -47,7 +47,9 @@ class MyConsumer(consumer.Consumer): yield self.do_something() # 4.0 -class MyConsumer(consumer.Consumer): +import rejected + +class MyConsumer(rejected.Consumer): async def process(self): self.logger.info(self.body) await self.do_something() @@ -86,29 +88,31 @@ class MyConsumer(consumer.SmartConsumer): self.publish_message(...) # 4.0 -class MyConsumer(consumer.Consumer): +import rejected + +class MyConsumer(rejected.Consumer): async def process(self): # self.body is auto-deserialized (same behavior) data = self.body await self.publish_message(...) ``` -### TransactionConsumer (new) +### FunctionalConsumer (new) -4.0 introduces `TransactionConsumer` for concurrent message processing. -Unlike `Consumer` (which holds a lock), `TransactionConsumer` passes a +4.0 introduces `FunctionalConsumer` for concurrent message processing. +Unlike `Consumer` (which holds a lock), `FunctionalConsumer` passes a `ProcessingContext` to each method and allows multiple messages to be processed in parallel: ```python -from rejected import consumer, models +import rejected -class MyConcurrentConsumer(consumer.TransactionConsumer): - async def process(self, ctx: models.ProcessingContext): +class MyConcurrentConsumer(rejected.FunctionalConsumer): + async def process(self, ctx: rejected.ProcessingContext): self.logger.info('Body: %s', ctx.message.body) ``` -Use `TransactionConsumer` with `qos_prefetch > 1` in the configuration. +Use `FunctionalConsumer` with `qos_prefetch > 1` in the configuration. ### ACK_PROCESSING_EXCEPTIONS behavior @@ -138,17 +142,38 @@ instead of rejected. ### Import changes +In 4.0, the primary consumer classes, exceptions, and models are all +available directly from the `rejected` package: + ```python # 3.x -from rejected.consumer import ConsumerException, MessageException, ProcessingException +from rejected.consumer import Consumer, ConsumerException, MessageException from rejected.data import Measurement -# 4.0 (preferred) -from rejected.exceptions import ConsumerException, MessageException, ProcessingException -from rejected.measurement import Measurement +# 4.0 (preferred — use top-level imports) +import rejected + +class MyConsumer(rejected.Consumer): ... +class MyConcurrent(rejected.FunctionalConsumer): ... + +# Exceptions +raise rejected.ConsumerException('...') +raise rejected.MessageException('...') +raise rejected.ProcessingException('...') -# 4.0 (backward-compatible re-exports still work) -from rejected.consumer import ConsumerException, MessageException, ProcessingException +# Models +ctx: rejected.ProcessingContext +msg: rejected.Message +result: rejected.Result +``` + +Sub-module imports still work for backward compatibility or if you prefer +explicit paths: + +```python +from rejected.exceptions import ConsumerException +from rejected.measurement import Measurement +from rejected.models import Message, ProcessingContext ``` ## Data Classes @@ -156,8 +181,8 @@ from rejected.consumer import ConsumerException, MessageException, ProcessingExc ### Message The `rejected.data.Message` and `rejected.data.Properties` classes have been -replaced by `rejected.models.Message`, a Pydantic model. If you accessed -these directly in tests or custom code: +replaced by `rejected.Message` (a Pydantic model). If you accessed these +directly in tests or custom code: ```python # 3.x @@ -166,8 +191,9 @@ msg = Message(channel, method, header, properties, body) msg.properties.content_type # 4.0 -from rejected.models import Message -msg = Message( +import rejected + +msg = rejected.Message( delivery_tag=1, exchange='exchange', routing_key='key', @@ -182,7 +208,8 @@ Key differences: - Properties are top-level fields on `Message`, not nested under `msg.properties` -- The AMQP `type` property is accessed as `msg.message_type` (not `msg.type`) +- The AMQP `type` property is accessed as `msg.type` (matching the AMQP + property name) - `Message` is a Pydantic `BaseModel` with validation - `body` starts as raw bytes; the `Codec` class decodes it asynchronously before the consumer sees it @@ -201,6 +228,8 @@ The API is unchanged. ```python # 3.x +from rejected import testing + class MyTest(testing.AsyncTestCase): def get_consumer(self): return MyConsumer @@ -210,6 +239,8 @@ class MyTest(testing.AsyncTestCase): yield self.process_message({'key': 'value'}) # 4.0 +from rejected import testing + class MyTest(testing.AsyncTestCase): def get_consumer(self): return MyConsumer @@ -285,8 +316,8 @@ connections: | 3.x API | 4.0 Replacement | |---------|----------------| -| `consumer.SmartConsumer` | `consumer.Consumer` (auto-deserializes) | -| `consumer.PublishingConsumer` | `consumer.Consumer` (always could publish) | +| `consumer.SmartConsumer` | `rejected.Consumer` (auto-deserializes) | +| `consumer.PublishingConsumer` | `rejected.Consumer` (always could publish) | | `self.io_loop` | `asyncio.get_event_loop()` | | `self.yield_to_ioloop()` | `await asyncio.sleep(0)` | | `self.reply(...)` | Use `await self.publish_message(...)` with `reply_to` | @@ -296,8 +327,8 @@ connections: | `self.statsd_incr(...)` | `self.stats_incr(...)` | | `self.statsd_track_duration(...)` | `self.stats_track_duration(...)` | | `rejected.data.Data` | Removed | -| `rejected.data.Message` | `rejected.models.Message` | -| `rejected.data.Properties` | Fields are on `rejected.models.Message` | +| `rejected.data.Message` | `rejected.Message` | +| `rejected.data.Properties` | Fields are on `rejected.Message` | | `pickle` content types | Removed for security (RCE risk) | ## Prometheus Metrics (new) @@ -321,16 +352,17 @@ to Prometheus. ## Quick Migration Checklist 1. Ensure Python >= 3.11 -2. Replace `@gen.coroutine` / `yield` with `async def` / `await` -3. Make `process()`, `prepare()`, `finish()` async -4. Add `await` to `publish_message()` calls -5. Replace `SmartConsumer` / `PublishingConsumer` with `Consumer` -6. Replace `tornado` imports with `asyncio` equivalents -7. Update tests to use `async def` test methods (no `@gen_test`) -8. Replace `from rejected.data import ...` with `from rejected.measurement import ...` - or `from rejected.models import ...` -9. Rename `publisher_confirmation` to `confirm` in connection config -10. Remove `Daemon` section from config (if present) -11. Remove `stats.influxdb` from config (use Prometheus instead) -12. Remove any `pickle` content type usage -13. Replace deprecated `statsd_*` method calls with `stats_*` equivalents +2. Update imports to use `import rejected` and `rejected.Consumer`, etc. +3. Replace `@gen.coroutine` / `yield` with `async def` / `await` +4. Make `process()`, `prepare()`, `finish()` async +5. Add `await` to `publish_message()` calls +6. Replace `SmartConsumer` / `PublishingConsumer` with `rejected.Consumer` +7. Replace `tornado` imports with `asyncio` equivalents +8. Replace `from rejected.data import ...` with `rejected.Message`, + `rejected.measurement.Measurement`, etc. +9. Update tests to use `async def` test methods (no `@gen_test`) +10. Rename `publisher_confirmation` to `confirm` in connection config +11. Remove `Daemon` section from config (if present) +12. Remove `stats.influxdb` from config (use Prometheus instead) +13. Remove any `pickle` content type usage +14. Replace deprecated `statsd_*` method calls with `stats_*` equivalents diff --git a/pyproject.toml b/pyproject.toml index 99eb12b..0ec7c78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ rejected = "rejected.controller:main" [project.urls] Homepage = "https://github.com/gmr/rejected" -Documentation = "https://rejected.readthedocs.io" +Documentation = "https://gmr.github.io/rejected" "Bug Tracker" = "https://github.com/gmr/rejected/issues" "Source Code" = "https://github.com/gmr/rejected" diff --git a/rejected/__init__.py b/rejected/__init__.py index 1288e96..4b048c0 100644 --- a/rejected/__init__.py +++ b/rejected/__init__.py @@ -13,11 +13,12 @@ from rejected.consumer import ( # noqa: E402 Consumer, ConsumerException, + FunctionalConsumer, MessageException, ProcessingException, - TransactionConsumer, + RejectedException, ) -from rejected.models import Message, Result # noqa: E402 +from rejected.models import Message, ProcessingContext, Result # noqa: E402 __author__ = 'Gavin M. Roy ' __since__ = '2009-09-10' @@ -30,11 +31,13 @@ 'AVRO_DATUM_MIME_TYPE', 'Consumer', 'ConsumerException', + 'FunctionalConsumer', 'Message', 'MessageException', + 'ProcessingContext', 'ProcessingException', + 'RejectedException', 'Result', - 'TransactionConsumer', '__author__', '__since__', '__version__', diff --git a/rejected/consumer.py b/rejected/consumer.py index ecba9c2..dd70d63 100644 --- a/rejected/consumer.py +++ b/rejected/consumer.py @@ -1,6 +1,6 @@ """ The :py:class:`Consumer` provides the backward-compatible consumer class -for rejected 3.x style consumers. :py:class:`TransactionConsumer` provides +for rejected 3.x style consumers. :py:class:`FunctionalConsumer` provides a new concurrent consumer that receives a :class:`~rejected.models.ProcessingContext`. @@ -46,7 +46,7 @@ class _Consumer: """Base consumer class implementing the core contract. Not intended to be used directly — extend :class:`Consumer` or - :class:`TransactionConsumer`. + :class:`FunctionalConsumer`. """ @@ -155,19 +155,19 @@ def _pre_execute( msg.correlation_id or msg.message_id or str(uuid.uuid4()) ) - if msg.message_type: - self.set_sentry_context('type', msg.message_type) + if msg.type: + self.set_sentry_context('type', msg.type) # Validate message type if self.MESSAGE_TYPE: expected = self.MESSAGE_TYPE if isinstance(expected, (tuple, list, set)): - supported = msg.message_type in expected + supported = msg.type in expected else: - supported = msg.message_type == expected + supported = msg.type == expected if not supported: self.logger.warning( - 'Received unsupported message type: %s', msg.message_type + 'Received unsupported message type: %s', msg.type ) if self._drop_invalid: if self._drop_exchange: @@ -619,7 +619,7 @@ def message_id(self) -> str | None: def message_type(self) -> str | None: if not self._context: return None - return self._context.message.message_type + return self._context.message.type @property def priority(self) -> int | None: @@ -723,7 +723,7 @@ def stats_track_duration( ) -class TransactionConsumer(_Consumer): +class FunctionalConsumer(_Consumer): """Concurrent consumer that receives a ProcessingContext. No lock — multiple messages may be processed concurrently. diff --git a/rejected/controller.py b/rejected/controller.py index 677a652..633930e 100644 --- a/rejected/controller.py +++ b/rejected/controller.py @@ -70,6 +70,7 @@ def run(self) -> None: consumer=self.args.consumer, profile=self.args.profile, quantity=self.args.quantity, + max_messages=self.args.max_messages, ) try: self._mcp.run() @@ -162,6 +163,15 @@ def _build_parser() -> argparse.ArgumentParser: metavar='N', help='Override the consumer quantity (use with -o)', ) + parser.add_argument( + '-n', + '--max-messages', + type=int, + default=None, + dest='max_messages', + metavar='N', + help='Process N messages per consumer then shut down', + ) parser.add_argument( '--version', action='version', version=f'%(prog)s {__version__}' ) @@ -182,6 +192,8 @@ def main() -> None: parser.error(f'Unknown consumer: {args.consumer}') if args.quantity is not None and args.consumer is None: parser.error('--qty requires --only') + if args.max_messages is not None and args.max_messages <= 0: + parser.error('--max-messages must be a positive integer') try: if cfg.logging: diff --git a/rejected/mcp.py b/rejected/mcp.py index b7e4ecd..6ade507 100644 --- a/rejected/mcp.py +++ b/rejected/mcp.py @@ -59,6 +59,7 @@ def __init__( consumer: str | None = None, profile: str | None = None, quantity: int | None = None, + max_messages: int | None = None, ) -> None: """Initialize the Master Control Program @@ -100,6 +101,7 @@ def __init__( LOGGER.debug('Stats logging enabled: %s', self.log_stats_enabled) # Setup the poller related threads + self.max_messages: int | None = max_messages self.poll_interval: float = config.poll_interval LOGGER.debug('Set process poll interval to %.2f', self.poll_interval) @@ -198,6 +200,11 @@ def check_process_counts(self) -> None: processes needed. """ + if self.max_messages: + LOGGER.debug( + 'Skipping process respawn (max_messages=%i)', self.max_messages + ) + return LOGGER.debug('Checking minimum consumer process levels') for name in self.consumers: processes_needed = self.process_spawn_qty(name) @@ -428,6 +435,7 @@ def new_process(self, consumer_name: str) -> tuple[str, process.Process]: 'daemon': False, 'stats_queue': self.stats_queue, 'logging_config': self.config.logging, + 'max_messages': self.max_messages, } return process_name, process.Process(name=process_name, kwargs=kwargs) diff --git a/rejected/models.py b/rejected/models.py index 6e46905..133e3fa 100644 --- a/rejected/models.py +++ b/rejected/models.py @@ -142,7 +142,7 @@ class Message(pydantic.BaseModel): str, bool | dict[str, typing.Any] | float | int | str | bytes ] message_id: str | None - message_type: str | None + type: str | None priority: int | None redelivered: bool reply_to: str | None diff --git a/rejected/process.py b/rejected/process.py index 8f4414a..4773ee8 100644 --- a/rejected/process.py +++ b/rejected/process.py @@ -291,10 +291,7 @@ async def invoke_consumer(self, ctx: models.ProcessingContext) -> None: msg = ctx.message try: msg.body = await self.codec.decode( - msg.body, - msg.content_type, - msg.content_encoding, - msg.message_type, + msg.body, msg.content_type, msg.content_encoding, msg.type ) except codecs.DecodeError as error: LOGGER.error('Failed to decode message body: %s', error) @@ -451,7 +448,7 @@ def on_message( dict(properties.headers) if properties.headers else {} ), message_id=properties.message_id, - message_type=properties.type, + type=properties.type, priority=properties.priority, redelivered=redelivered, reply_to=properties.reply_to, @@ -538,6 +535,14 @@ def on_processed(self, ctx: models.ProcessingContext) -> None: if self.statsd: self._submit_statsd(ctx.measurement) + # Shut down after reaching the message limit + if self.max_messages and self._processed_count >= self.max_messages: + LOGGER.info( + 'Reached max messages (%i), shutting down', self.max_messages + ) + self.shutdown_connections() + return + # Transition state based on remaining in-flight messages if not self._in_flight: if self.is_waiting_to_shutdown: @@ -989,6 +994,10 @@ def queue_name(self) -> str: def stats_queue(self) -> 'multiprocessing.Queue[dict[str, typing.Any]]': return self._kwargs['stats_queue'] # type: ignore[no-any-return] + @property + def max_messages(self) -> int | None: + return self._kwargs.get('max_messages') # type: ignore[no-any-return] + @property def too_many_errors(self) -> bool: """Return a bool if too many errors have occurred. diff --git a/rejected/testing.py b/rejected/testing.py index 1605e05..1c6fd33 100644 --- a/rejected/testing.py +++ b/rejected/testing.py @@ -68,7 +68,7 @@ async def test_consumer_raises_message_exception(self): class AsyncTestCase(unittest.IsolatedAsyncioTestCase): """:class:`unittest.IsolatedAsyncioTestCase` subclass for testing :class:`~rejected.consumer.Consumer` and - :class:`~rejected.consumer.TransactionConsumer` classes. + :class:`~rejected.consumer.FunctionalConsumer` classes. """ @@ -162,7 +162,7 @@ def create_context( expiration=properties.get('expiration'), headers=properties.get('headers', {}), message_id=properties.get('message_id', str(uuid.uuid4())), - message_type=properties.get('type'), + type=properties.get('type'), priority=properties.get('priority'), redelivered=False, reply_to=properties.get('reply_to'), diff --git a/tests/test_consumer.py b/tests/test_consumer.py index b534bb0..be9ee51 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -24,7 +24,7 @@ def _make_message(**kwargs: typing.Any) -> models.Message: 'expiration': '32768', 'headers': {'foo': 'bar'}, 'message_id': 'mid123', - 'message_type': 'test', + 'type': 'test', 'priority': 5, 'redelivered': False, 'reply_to': 'rtrk', @@ -123,7 +123,7 @@ async def process(self): pass obj = TypedConsumer(config_module.Settings({}), None) - ctx = _make_ctx(_make_message(message_type='wrong')) + ctx = _make_ctx(_make_message(type='wrong')) await obj.execute(ctx) self.assertEqual(ctx.result, models.Result.MESSAGE_DROP) @@ -135,7 +135,7 @@ async def process(self): pass obj = TypedConsumer(config_module.Settings({}), None) - ctx = _make_ctx(_make_message(message_type='wrong')) + ctx = _make_ctx(_make_message(type='wrong')) await obj.execute(ctx) self.assertEqual(ctx.result, models.Result.MESSAGE_EXCEPTION) @@ -206,7 +206,7 @@ async def process(self): self.assertEqual(captured['expiration'], msg.expiration) self.assertEqual(captured['headers'], msg.headers) self.assertEqual(captured['message_id'], msg.message_id) - self.assertEqual(captured['message_type'], msg.message_type) + self.assertEqual(captured['message_type'], msg.type) self.assertEqual(captured['name'], 'PropConsumer') self.assertEqual(captured['priority'], msg.priority) self.assertEqual(captured['redelivered'], msg.redelivered) diff --git a/tests/test_controller.py b/tests/test_controller.py index 4f626f7..f21fd83 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -17,6 +17,7 @@ def _make_args(**kwargs): 'profile': None, 'prepend_path': None, 'quantity': None, + 'max_messages': None, } defaults.update(kwargs) return argparse.Namespace(**defaults) diff --git a/tests/test_process.py b/tests/test_process.py index 952992d..bdd4e52 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -90,7 +90,7 @@ def _make_message( expiration='32768', headers={'foo': 'bar'}, message_id='mid123', - message_type='test', + type='test', priority=5, redelivered=redelivered, reply_to='rtrk',