Annotation-driven RabbitMQ integration for the GuicedEE / Vert.x stack. Declare connections, exchanges, queues, consumers, and publishers with annotations β everything is discovered at startup via ClassGraph, wired through Guice, and managed by the Vert.x RabbitMQ client.
Built on Vert.x RabbitMQ Client Β· RabbitMQ Java Client Β· Google Guice Β· JPMS module com.guicedee.rabbit Β· Java 25+
<dependency>
<groupId>com.guicedee</groupId>
<artifactId>rabbitmq</artifactId>
</dependency>Gradle (Kotlin DSL)
implementation("com.guicedee:rabbitmq:2.0.0-SNAPSHOT")- Annotation-driven setup β
@RabbitConnectionOptions,@QueueExchange,@QueueDefinition, and@QueueOptionsdeclare the entire topology - Auto-discovery β
RabbitMQPreStartupscans the classpath via ClassGraph to find all annotated connection, exchange, consumer, and publisher declarations - Guice-managed consumers & publishers β consumer classes are bound as singletons;
QueuePublisherinstances are injectable by@Named("queue-name") - Exchange management β exchanges are declared automatically with configurable type (Direct, Fanout, Topic, Headers), durability, auto-delete, and optional dead-letter exchange
- Queue options β priority, prefetch count, TTL, durability, auto-ack, single-consumer mode, exclusive, transacted, auto-bind, consumer count, dedicated channel/connection
- Publisher confirms β optional per-connection
confirmPublisheswithwaitForConfirms()after each publish - Environment variable overrides β every annotation attribute can be overridden via system properties or environment variables at runtime, scoped by name (see Environment Variable Overrides)
- SPI callback β implement
OnQueueExchangeDeclaredto perform custom work after an exchange is declared
Step 1 β Define a connection and exchange. Place @RabbitConnectionOptions on a class or package-info.java. If placed inside a @Verticle-annotated package, the connection is managed by that verticle; otherwise it falls into the default verticle. Add @QueueExchange on a class or package-info.java in the same (or sub-) package:
@Verticle
@RabbitConnectionOptions(
value = "my-connection",
host = "localhost",
user = "guest",
password = "guest"
)
@QueueExchange(
value = "my-exchange",
exchangeType = QueueExchange.ExchangeType.Direct,
durable = true
)
package com.example.messaging;
import com.guicedee.vertx.spi.Verticle;Or without @Verticle (uses the default verticle):
@RabbitConnectionOptions(
value = "my-connection",
host = "localhost",
user = "guest",
password = "guest"
)
@QueueExchange(
value = "my-exchange",
exchangeType = QueueExchange.ExchangeType.Direct,
durable = true
)
public class MessagingConfig {
}Step 2 β Create a consumer:
@QueueDefinition("order-events")
public class OrderConsumer implements QueueConsumer {
@Override
public void consume(RabbitMQMessage message) {
System.out.println("Received: " + message.body());
}
}Step 3 β Inject a publisher:
public class OrderService {
@Inject
@Named("order-events")
private QueuePublisher orderPublisher;
public void placeOrder(String orderJson) {
orderPublisher.publish(orderJson);
}
}Step 4 β Bootstrap GuicedEE:
IGuiceContext.instance().inject();That's it. RabbitMQPreStartup discovers the annotations, RabbitMQModule creates the Guice bindings, and RabbitPostStartup declares exchanges/queues and starts consumers automatically.
Startup
IGuiceContext.instance()
ββ RabbitMQPreStartup (IGuicePreStartup β annotation scanning)
ββ Discovers @RabbitConnectionOptions classes
ββ Discovers @QueueExchange classes per package
ββ Discovers @QueueDefinition consumer classes
ββ Discovers @QueueDefinition/@QueuePublisher fields
ββ Registers metadata for binding
ββ RabbitMQModule (IGuiceModule β Guice bindings)
ββ Creates RabbitMQClient per connection (via Vert.x)
ββ Binds RabbitMQClient as @Named("connection-name")
ββ Binds QueueConsumer implementations as singletons
ββ Binds QueuePublisher instances as @Named("queue-name")
ββ Binds RabbitMQClient as @Named("connection-name")
ββ RabbitPostStartup (IGuicePostStartup β runtime initialization)
ββ Starts RabbitMQ client connections
ββ Declares exchanges (with optional DLX)
ββ Declares queues with options (priority, TTL, etc.)
ββ Binds queues to exchanges with routing keys
ββ Enables publisher confirms if configured
ββ Starts consumer listeners
Publish:
QueuePublisher.publish(body)
β AMQP.BasicProperties (priority, etc.)
β RabbitMQClient.basicPublish(exchange, routingKey, buffer)
β [optional] client.waitForConfirms()
Consume:
RabbitMQ broker delivers message
β RabbitMQConsumer handler
β CallScoper transaction boundary
β QueueConsumer.consume(message) β Guice-managed instance
Placed on a class or package-info.java to declare a RabbitMQ connection. Optionally within a @Verticle package for dedicated verticle management; otherwise uses the default verticle:
| Attribute | Default | Purpose |
|---|---|---|
value |
"default" |
Connection name (used for @Named binding) |
uri |
"" |
Connection URI (takes precedence over host/port) |
host |
"" |
Broker hostname |
port |
0 |
Broker port (0 = default 5672) |
user |
"guest" |
Username |
password |
"guest" |
Password |
virtualHost |
"" |
Virtual host |
addresses |
{} |
Clustered broker addresses |
connectionTimeout |
0 |
Connection timeout (ms) |
requestedHeartbeat |
0 |
Heartbeat interval (s) |
handshakeTimeout |
0 |
Handshake timeout (ms) |
requestedChannelMax |
0 |
Max channels |
networkRecoveryInterval |
0 |
Recovery interval (ms) |
automaticRecoveryEnabled |
true |
Enable automatic recovery |
automaticRecoveryOnInitialConnection |
false |
Recover on initial connect failure |
reconnectAttempts |
0 |
Vert.x reconnect attempts |
reconnectInterval |
0 |
Vert.x reconnect interval (ms) |
useNio |
false |
Use NIO connections |
confirmPublishes |
false |
Enable publisher confirms |
Placed on a class or package-info.java to declare an AMQP exchange:
| Attribute | Default | Purpose |
|---|---|---|
value |
"default" |
Exchange name |
exchangeType |
Direct |
Exchange type: Direct, Fanout, Topic, Headers |
durable |
false |
Survive broker restarts |
autoDelete |
false |
Delete when last consumer disconnects |
createDeadLetter |
false |
Create a companion .DLX exchange and queue |
Placed on a QueueConsumer class or a QueuePublisher field:
| Attribute | Default | Purpose |
|---|---|---|
value |
β | Queue name (required) |
options |
@QueueOptions |
Queue-level options |
exchange |
"default" |
Exchange to bind to (or "default" for the package exchange) |
Nested within @QueueDefinition to configure queue behavior:
| Attribute | Default | Purpose |
|---|---|---|
priority |
0 |
Max priority level (0 disables) |
fetchCount |
0 |
Consumer prefetch count |
durable |
false |
Queue survives broker restarts |
delete |
false |
Auto-delete when last consumer disconnects |
autoAck |
false |
Auto-acknowledge messages |
consumerExclusive |
false |
Exclusive to the declaring connection |
singleConsumer |
false |
Enforce single active consumer |
ttl |
0 |
Message time-to-live (ms, 0 disables) |
noLocal |
false |
Don't deliver messages published on same connection |
keepMostRecent |
true |
Keep most recent messages on backlog overflow |
maxInternalQueueSize |
MAX_VALUE |
Client-side consumer buffer size |
transacted |
true |
Wrap consumption in a CallScoper transaction boundary |
autobind |
true |
Auto-start consumer on startup |
consumerCount |
1 |
Number of consumer instances |
dedicatedChannel |
false |
Use a dedicated channel |
dedicatedConnection |
false |
Use a dedicated connection |
Inject a QueuePublisher by queue name:
@Inject
@Named("order-events")
private QueuePublisher publisher;
public void send() {
publisher.publish("{\"orderId\": 123}");
}The publisher resolves the exchange and routing key automatically from the queue's @QueueDefinition. The routing key follows the pattern {exchangeName}_{queueName}.
A QueuePublisher injection point can optionally carry @QueueDefinition to override the exchange or options for that specific publisher. This is useful when publishing to a queue that belongs to a different exchange than the package default:
@Inject
@Named("cross-exchange-queue")
@QueueDefinition(value = "cross-exchange-queue", exchange = "other-exchange",
options = @QueueOptions(priority = 5))
private QueuePublisher crossExchangePublisher;When @QueueDefinition is present on the field and specifies an exchange other than "default", that exchange is used instead of the package-level exchange. If @QueueDefinition is absent, the queue name is resolved from the @Named value and the package exchange is used.
Enable per-connection with @RabbitConnectionOptions(confirmPublishes = true). The publisher will call client.waitForConfirms() after each publish and log the result.
Implement QueueConsumer and annotate with @QueueDefinition:
@QueueDefinition(value = "notifications",
options = @QueueOptions(durable = true, autoAck = false, fetchCount = 10))
public class NotificationConsumer implements QueueConsumer {
@Override
public void consume(RabbitMQMessage message) {
// Process the message
System.out.println(message.body().toString());
}
}Use @QueueOptions(singleConsumer = true) to ensure only one active consumer at a time (uses the x-single-active-consumer queue argument).
All annotation attributes can be overridden at runtime via system properties or environment variables. Overrides are scoped by name β the lookup tries a name-specific key first, then falls back to a global key:
RABBITMQ_{NORMALIZED_NAME}_{PROPERTY}β name-specific overrideRABBITMQ_{PROPERTY}β global fallback- Annotation default
The name is normalized to uppercase with hyphens and dots replaced by underscores. For example, a connection named order-service checking the HOST property would resolve:
RABBITMQ_ORDER_SERVICE_HOST(name-specific)RABBITMQ_HOST(global fallback)- The
hostattribute from the@RabbitConnectionOptionsannotation
Scoped by the connection value() name:
| Property | Variable pattern |
|---|---|
value() |
RABBITMQ_{name}_CONNECTION_NAME / RABBITMQ_CONNECTION_NAME |
uri() |
RABBITMQ_{name}_URI / RABBITMQ_URI |
host() |
RABBITMQ_{name}_HOST / RABBITMQ_HOST |
port() |
RABBITMQ_{name}_PORT / RABBITMQ_PORT |
user() |
RABBITMQ_{name}_USER / RABBITMQ_USER |
password() |
RABBITMQ_{name}_PASSWORD / RABBITMQ_PASSWORD |
virtualHost() |
RABBITMQ_{name}_VIRTUAL_HOST / RABBITMQ_VIRTUAL_HOST |
addresses() |
RABBITMQ_{name}_ADDRESSES / RABBITMQ_ADDRESSES |
connectionTimeout() |
RABBITMQ_{name}_CONNECTION_TIMEOUT / RABBITMQ_CONNECTION_TIMEOUT |
requestedHeartbeat() |
RABBITMQ_{name}_REQUESTED_HEARTBEAT / RABBITMQ_REQUESTED_HEARTBEAT |
handshakeTimeout() |
RABBITMQ_{name}_HANDSHAKE_TIMEOUT / RABBITMQ_HANDSHAKE_TIMEOUT |
requestedChannelMax() |
RABBITMQ_{name}_REQUESTED_CHANNEL_MAX / RABBITMQ_REQUESTED_CHANNEL_MAX |
networkRecoveryInterval() |
RABBITMQ_{name}_NETWORK_RECOVERY_INTERVAL / RABBITMQ_NETWORK_RECOVERY_INTERVAL |
automaticRecoveryEnabled() |
RABBITMQ_{name}_AUTOMATIC_RECOVERY_ENABLED / RABBITMQ_AUTOMATIC_RECOVERY_ENABLED |
automaticRecoveryOnInitialConnection() |
RABBITMQ_{name}_AUTOMATIC_RECOVERY_ON_INITIAL_CONNECTION / RABBITMQ_AUTOMATIC_RECOVERY_ON_INITIAL_CONNECTION |
reconnectAttempts() |
RABBITMQ_{name}_RECONNECT_ATTEMPTS / RABBITMQ_RECONNECT_ATTEMPTS |
reconnectInterval() |
RABBITMQ_{name}_RECONNECT_INTERVAL / RABBITMQ_RECONNECT_INTERVAL |
useNio() |
RABBITMQ_{name}_USE_NIO / RABBITMQ_USE_NIO |
confirmPublishes() |
RABBITMQ_{name}_CONFIRM_PUBLISHES / RABBITMQ_CONFIRM_PUBLISHES |
registerWriteHandler() |
RABBITMQ_{name}_REGISTER_WRITE_HANDLER / RABBITMQ_REGISTER_WRITE_HANDLER |
Scoped by the exchange value() name:
| Property | Variable pattern |
|---|---|
value() |
RABBITMQ_{name}_EXCHANGE_NAME / RABBITMQ_EXCHANGE_NAME |
exchangeType() |
RABBITMQ_{name}_EXCHANGE_TYPE / RABBITMQ_EXCHANGE_TYPE |
durable() |
RABBITMQ_{name}_EXCHANGE_DURABLE / RABBITMQ_EXCHANGE_DURABLE |
autoDelete() |
RABBITMQ_{name}_EXCHANGE_AUTO_DELETE / RABBITMQ_EXCHANGE_AUTO_DELETE |
createDeadLetter() |
RABBITMQ_{name}_EXCHANGE_CREATE_DEAD_LETTER / RABBITMQ_EXCHANGE_CREATE_DEAD_LETTER |
Scoped by the queue value() name:
| Property | Variable pattern |
|---|---|
value() |
RABBITMQ_{name}_QUEUE_NAME / RABBITMQ_QUEUE_NAME |
exchange() |
RABBITMQ_{name}_QUEUE_EXCHANGE / RABBITMQ_QUEUE_EXCHANGE |
Scoped by the parent queue value() name:
| Property | Variable pattern |
|---|---|
priority() |
RABBITMQ_{name}_QUEUE_PRIORITY / RABBITMQ_QUEUE_PRIORITY |
fetchCount() |
RABBITMQ_{name}_QUEUE_FETCH_COUNT / RABBITMQ_QUEUE_FETCH_COUNT |
durable() |
RABBITMQ_{name}_QUEUE_DURABLE / RABBITMQ_QUEUE_DURABLE |
delete() |
RABBITMQ_{name}_QUEUE_DELETE / RABBITMQ_QUEUE_DELETE |
autoAck() |
RABBITMQ_{name}_QUEUE_AUTO_ACK / RABBITMQ_QUEUE_AUTO_ACK |
consumerExclusive() |
RABBITMQ_{name}_QUEUE_CONSUMER_EXCLUSIVE / RABBITMQ_QUEUE_CONSUMER_EXCLUSIVE |
singleConsumer() |
RABBITMQ_{name}_QUEUE_SINGLE_CONSUMER / RABBITMQ_QUEUE_SINGLE_CONSUMER |
ttl() |
RABBITMQ_{name}_QUEUE_TTL / RABBITMQ_QUEUE_TTL |
noLocal() |
RABBITMQ_{name}_QUEUE_NO_LOCAL / RABBITMQ_QUEUE_NO_LOCAL |
keepMostRecent() |
RABBITMQ_{name}_QUEUE_KEEP_MOST_RECENT / RABBITMQ_QUEUE_KEEP_MOST_RECENT |
maxInternalQueueSize() |
RABBITMQ_{name}_QUEUE_MAX_INTERNAL_SIZE / RABBITMQ_QUEUE_MAX_INTERNAL_SIZE |
transacted() |
RABBITMQ_{name}_QUEUE_TRANSACTED / RABBITMQ_QUEUE_TRANSACTED |
autobind() |
RABBITMQ_{name}_QUEUE_AUTOBIND / RABBITMQ_QUEUE_AUTOBIND |
consumerCount() |
RABBITMQ_{name}_QUEUE_CONSUMER_COUNT / RABBITMQ_QUEUE_CONSUMER_COUNT |
dedicatedChannel() |
RABBITMQ_{name}_QUEUE_DEDICATED_CHANNEL / RABBITMQ_QUEUE_DEDICATED_CHANNEL |
dedicatedConnection() |
RABBITMQ_{name}_QUEUE_DEDICATED_CONNECTION / RABBITMQ_QUEUE_DEDICATED_CONNECTION |
| Variable | Purpose |
|---|---|
RABBIT_MQ_PORT |
Global port override applied at the module level (in RabbitMQModule) |
| SPI | Purpose |
|---|---|
OnQueueExchangeDeclared |
Callback invoked after an exchange is declared β perform custom work (bind additional queues, etc.) |
IGuicePreStartup |
Provided by RabbitMQPreStartup β scans for RabbitMQ annotations |
IGuiceModule |
Provided by RabbitMQModule β binds clients, consumers, publishers |
IGuicePostStartup |
Provided by RabbitPostStartup β declares exchanges/queues, starts consumers |
Implement this SPI to execute custom logic after an exchange has been declared:
public class MyExchangeCallback implements OnQueueExchangeDeclared {
@Override
public void perform(RabbitMQClient client, String exchangeName) {
// Bind additional queues, set policies, etc.
}
}Register via module-info.java:
provides com.guicedee.rabbit.implementations.def.OnQueueExchangeDeclared
with com.example.MyExchangeCallback;| Type | Named by | Example |
|---|---|---|
RabbitMQClient |
Connection name | @Named("my-connection") RabbitMQClient client |
QueuePublisher |
Queue name | @Named("order-events") QueuePublisher publisher |
QueueConsumer |
Queue name | @Named("order-events") QueueConsumer consumer |
@Inject
@Named("my-connection")
private RabbitMQClient client;com.guicedee.rabbit
βββ io.vertx.rabbitmq (Vert.x RabbitMQ client)
βββ com.rabbitmq.client (AMQP Java client)
βββ com.guicedee.vertx (Vert.x lifecycle)
βββ com.guicedee.client (GuicedEE SPI contracts)
βββ io.github.classgraph (annotation scanning)
βββ org.apache.commons.lang3 (StringUtils)
Module name: com.guicedee.rabbit
The module:
- exports
com.guicedee.rabbit,com.guicedee.rabbit.implementations.def,com.guicedee.rabbit.implementations - uses
OnQueueExchangeDeclared - provides
IGuicePostStartupwithRabbitPostStartup - provides
IGuiceModulewithRabbitMQModule - provides
IGuicePreStartupwithRabbitMQPreStartup - opens
com.guicedee.rabbit,com.guicedee.rabbit.implementations.def,com.guicedee.rabbit.implementationstocom.google.guiceandcom.fasterxml.jackson.databind
| Class | Package | Role |
|---|---|---|
RabbitConnectionOptions |
rabbit |
Annotation declaring a RabbitMQ connection with all AMQP/Vert.x options |
QueueExchange |
rabbit |
Annotation declaring an AMQP exchange (Direct, Fanout, Topic, Headers) |
QueueDefinition |
rabbit |
Annotation declaring a queue name with options and exchange binding |
QueueOptions |
rabbit |
Annotation for queue-level configuration (priority, TTL, durability, etc.) |
QueueConsumer |
rabbit |
Interface for consuming RabbitMQMessage instances |
QueuePublisher |
rabbit |
Publishes string payloads to a resolved exchange/routing key |
RabbitMQPreStartup |
implementations |
IGuicePreStartup β scans for all RabbitMQ annotations and registers metadata |
RabbitMQModule |
implementations |
IGuiceModule β creates clients and binds consumers/publishers in Guice |
RabbitPostStartup |
implementations |
IGuicePostStartup β declares exchanges/queues and starts consumers |
OnQueueExchangeDeclared |
implementations.def |
SPI callback invoked after exchange declaration |
QueueOptionsDefault |
implementations.def |
Default mutable implementation of QueueOptions for programmatic use |
The test suite uses Testcontainers to spin up a RabbitMQ broker:
try (var rabbit = new RabbitMQContainer(
DockerImageName.parse("rabbitmq:3.7.25-management-alpine"))) {
rabbit.setPortBindings(List.of("5672"));
rabbit.start();
IGuiceContext.instance().inject();
// Publishers are injectable after startup
QueuePublisher publisher = IGuiceContext.get(
Key.get(QueuePublisher.class, Names.named("test-queue-consumer")));
publisher.publish("Hello RabbitMQ");
}Run tests (requires Docker):
mvn testIssues and pull requests are welcome β please add tests for new queue options, exchange types, or consumer features.