Conversation
WalkthroughAdds typed microservice examples (User/Email), new event types, and a QueueRepository facade with publish/consume/handleEvent. Refactors queue internals to private fields, removes Entity.consume/queue getter, and wires Service1 to publish UserCreated and Service2 to consume it and publish EmailSent. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor S1 as Service1
participant Repo as QueueRepository
participant MQ as MessageBroker
participant C as Consumer
actor S2 as Service2
S1->>Repo: publish(Events.UserCreated, {id,name,email})
Repo->>MQ: publish BaseEventMessage{event,data}
MQ-->>C: deliver BaseEventMessage
C->>Repo: handleEvent(handlers, msg)
alt handler exists
Repo->>S2: invoke handler(data)
S2->>Repo: publish(Events.EmailSent, {email})
Repo->>MQ: publish BaseEventMessage{event,data}
Repo-->>C: Status success
else no handler
Repo->>Repo: log "no handler"
Repo-->>C: Status ignore
end
sequenceDiagram
autonumber
participant App as repository.consume(queue, handlers)
participant Consumer as Consumer
participant Repo as QueueRepository
participant Logger as Logger
App->>Consumer: create consumer for queue
Consumer-->>Repo: on message (BaseEventMessage)
Repo->>Repo: handleEvent(handlers, msg)
alt matching handler
Repo-->>Consumer: ack/final Status (success/fail)
else missing handler
Repo->>Logger: warn missing handler
Repo-->>Consumer: Status ignore
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/queue/src/repository.ts (2)
14-16: Swap fail/ignore status mappings to prevent infinite requeue loopsCurrent mappings make ignore => REQUEUE and fail => DROP, which is counterintuitive and causes unknown events to be endlessly requeued. Prefer: success=ACK, fail=REQUEUE, ignore=DROP.
Apply this diff:
success = () => ConsumerStatus.ACK - fail = () => ConsumerStatus.DROP - ignore = () => ConsumerStatus.REQUEUE + fail = () => ConsumerStatus.REQUEUE + ignore = () => ConsumerStatus.DROP
18-20: Avoid throwing in checkHealth before connectcheckHealth throws if called before connect because connection getter throws. Return false when not connected.
Apply this diff:
- checkHealth(): boolean { - return this.connection.ready - } + checkHealth(): boolean { + return this.#connection?.ready ?? false + }
🧹 Nitpick comments (8)
packages/queue/src/entity.ts (2)
23-44: DLX wiring likely inverted; risk of infinite retriesMain queue dead-letters to fail.exchange, and fail.exchange routes to the retry queue. That’s a confusing hop and there’s no terminal DLQ, so messages can loop forever.
Recommend:
- Main queue: x-dead-letter-exchange -> retry.exchange
- Retry queue: x-dead-letter-exchange -> events.exchange
- Bind retry.exchange ->
${name}.retry.dlx
Optionally add a${name}.fail.dlqfor terminal failures after N attempts.Apply minimal wiring fix:
- 'x-dead-letter-exchange': this.repository.exchanges.fail.exchange, + 'x-dead-letter-exchange': this.repository.exchanges.retry.exchange,- 'x-dead-letter-exchange': this.repository.exchanges.retry.exchange, + 'x-dead-letter-exchange': this.repository.exchanges.events.exchange,Please confirm intended retry/poison-queue behavior. Based on learnings
46-72: Bindings should target retry.exchange for the retry queueWith the standard pattern above, bind retry.exchange to the retry queue (not fail.exchange). Also consider adding a dedicated fail DLQ binding.
- const failExchange = this.repository.exchanges.fail.exchange + const failExchange = this.repository.exchanges.fail.exchange @@ - { - queue: `${this.name}.retry.dlx`, - exchange: failExchange, - routingKey: event, - }) + { + queue: `${this.name}.retry.dlx`, + exchange: retryExchange, + routingKey: event, + })If you want a terminal DLQ, add another queue
${this.name}.fail.dlqand bind it to failExchange with the same routing keys.examples/microservices/repository/index.ts (1)
1-15: Provide a typed consume overload to avoid unsafe casts at call sitesService2 casts to EventHandlerMap to satisfy the current signature. Expose a typed helper that accepts a partial handler map for only the events that service handles.
-import type { EventMessage } from './types' +import type { EventMessage, EventHandlerMap } from './types' @@ async publish<T extends EventMessage>(event: T['event'], data: T['data']) { return super.publish(event, data) } + + // Allow services to supply only the handlers they need + async consume(queue: string, handlers: Partial<EventHandlerMap>) { + return super.consume(queue, handlers as any) + }This removes the need for casts in services and matches handleEvent’s “ignore missing handlers” behavior.
examples/microservices/repository/types.ts (1)
11-15: Make handler map partial and event-specific to remove unsafe castsCurrent
Record<EventMessage['event'], EventMessageHandler>forces handlers for all events and loses per-event data typing; services then cast.-export type EventMessageHandler<T = EventMessage['data']> = (data: T) => Promise<boolean> - -export type EventHandlerMap = Record<EventMessage['event'], EventMessageHandler> +export type EventMessageHandler<T> = (data: T) => Promise<boolean> +type HandlerFor<K extends EventMessage['event']> = + EventMessageHandler<Extract<EventMessage, { event: K }>['data']> +export type EventHandlerMap = { + [K in EventMessage['event']]?: HandlerFor<K> +}This lets services provide only relevant handlers, with correct payload types.
packages/queue/src/types.ts (1)
49-53: Clarify/deprecate BaseEventHandler in favor of BaseEventMessageHandlerYou now route handlers as (data) => Promise via BaseEventMessageHandler/BaseEventHandlerMap. Keeping BaseEventHandler (message) => Promise may confuse users.
- Either mark BaseEventHandler as deprecated in a JSDoc comment, or remove it if unused.
packages/queue/src/repository.ts (3)
61-65: Tighten error message for missing queue nameError reads “not found” even when the argument is just empty.
Apply this diff:
- if (!queue) { - throw new Error(`Queue "${queue}" not found`) - } + if (!queue) { + throw new Error('Queue name must be provided') + }
66-75: Add exchanges and queueBindings to Consumer options for robust re-setup after reconnectsrabbitmq-client can auto re-declare and re-bind on reconnect if you pass exchanges/queueBindings in createConsumer options. Today you rely on prior declarations with passive: true, which may not re-run on reconnect.
Apply this diff (uses existing exchanges/bindings):
- const consumer = this.connection.createConsumer({ + const consumer = this.connection.createConsumer({ queue, queueOptions: { passive: true, }, noAck: false, qos: { prefetchCount: 1, }, + exchanges: Object.values(this.exchanges), + queueBindings: this.bindings + .filter((b) => b.queue === queue) + .map(({ exchange, routingKey }) => ({ exchange, routingKey })), }, async (msg) => this.handleEvent(eventHandlers, msg.body))Based on learnings
86-92: Log and drop unknown events (paired with ignore => DROP mapping)Unknown events currently return ignore (REQUEUE), creating redelivery loops. After swapping mappings, keep ignore here and add a warning for observability.
Apply this diff:
const handler = eventHandlers[msg.event] if (!handler) { - return this.ignore() + console.warn('No handler for event; dropping message', { event: msg.event }) + return this.ignore() }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
examples/microservices/repository/entities/email.ts(1 hunks)examples/microservices/repository/entities/user.ts(1 hunks)examples/microservices/repository/index.ts(1 hunks)examples/microservices/repository/types.ts(1 hunks)examples/microservices/service1/index.ts(1 hunks)examples/microservices/service2/index.ts(1 hunks)packages/queue/src/entity.ts(3 hunks)packages/queue/src/repository.ts(5 hunks)packages/queue/src/types.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (9)
examples/microservices/service1/index.ts (1)
examples/microservices/repository/index.ts (1)
repository(15-15)
examples/microservices/repository/entities/email.ts (3)
packages/queue/src/entity.ts (1)
Entity(9-73)examples/microservices/repository/index.ts (1)
repository(15-15)packages/queue/src/repository.ts (1)
Repository(6-143)
examples/microservices/repository/index.ts (5)
packages/queue/src/types.ts (1)
QueueRepository(12-26)packages/queue/src/repository.ts (1)
Repository(6-143)examples/microservices/repository/entities/user.ts (1)
User(4-12)examples/microservices/repository/entities/email.ts (1)
examples/microservices/repository/types.ts (1)
EventMessage(9-9)
examples/microservices/service2/index.ts (2)
examples/microservices/repository/index.ts (1)
repository(15-15)examples/microservices/repository/types.ts (2)
EventHandlerMap(14-14)UserCreated(16-23)
examples/microservices/repository/entities/user.ts (3)
packages/queue/src/entity.ts (1)
Entity(9-73)examples/microservices/repository/index.ts (1)
repository(15-15)packages/queue/src/repository.ts (1)
Repository(6-143)
packages/queue/src/entity.ts (2)
packages/queue/src/types.ts (1)
QueueRepository(12-26)packages/queue/src/repository.ts (1)
name(112-121)
packages/queue/src/types.ts (1)
packages/queue/src/repository.ts (2)
queue(123-132)connectionString(100-110)
packages/queue/src/repository.ts (1)
packages/queue/src/types.ts (4)
QueueRepository(12-26)BaseEventMessage(44-47)BaseEventHandlerMap(53-53)Status(10-10)
examples/microservices/repository/types.ts (1)
packages/queue/src/types.ts (2)
Status(10-10)BaseEventMessage(44-47)
🔇 Additional comments (3)
examples/microservices/repository/entities/email.ts (1)
1-15: LGTMEntity name and eventsToConsume align; typing via import type keeps runtime clean.
examples/microservices/repository/entities/user.ts (1)
1-12: LGTMMinimal entity setup is fine.
examples/microservices/service2/index.ts (1)
1-31: Verify consumer handler semantics with rabbitmq-clientReturning a “Status” value from the handler may be ignored by rabbitmq-client, which acks on resolve and nacks on throw. Ensure your handler path throws to trigger a nack/requeue when needed, or confirm that returning Status is honored.
Based on learnings
| import { repository } from '../repository' | ||
| import { Events } from '../repository/types' | ||
|
|
||
| // Service 1: User Service | ||
| const body = { | ||
| name: 'John Doe', | ||
| email: '5Tt9o@example.com', | ||
| } | ||
|
|
||
| // DB: Save user in database | ||
| const newUser = { | ||
| id: '123', | ||
| ...body, | ||
| } | ||
|
|
||
| // Publish Event for other services | ||
| repository.publish(Events.UserCreated, { | ||
| id: newUser.id, | ||
| name: newUser.name, | ||
| email: newUser.email, | ||
| }) |
There was a problem hiding this comment.
Connect before publish and await the Promise
Currently publish is fired without a connection and not awaited. This will throw (“Connection is not created”) and/or drop errors.
Apply:
+// Initialize connection once at startup
+await repository.connect(process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672')
@@
-// Publish Event for other services
-repository.publish(Events.UserCreated, {
+// Publish Event for other services
+await repository.publish(Events.UserCreated, {
id: newUser.id,
name: newUser.name,
email: newUser.email,
})If top-level await isn’t allowed, wrap in an async main and call main().catch(console.error).
🤖 Prompt for AI Agents
In examples/microservices/service1/index.ts around lines 1 to 21, the code calls
repository.publish without first establishing a connection and without awaiting
the returned Promise; update the flow to first await repository.connect() (or
the appropriate connection/init method on repository), then await
repository.publish(...) so the connection exists and errors are propagated; if
top-level await is not supported, wrap the connect/publish sequence in an async
function (e.g., async main() { await repository.connect(); await
repository.publish(...); }) and call main().catch(console.error).
| import type { EventHandlerMap, UserCreated } from '../repository/types' | ||
| import { repository } from '../repository' | ||
| import { Events } from '../repository/types' | ||
|
|
||
| // Service 2: Email Service | ||
|
|
||
| // Consume to Events | ||
| repository.consume(repository.email.name, { | ||
| userCreated: handleUserCreated, | ||
| } as EventHandlerMap) | ||
|
|
||
| // Business logic | ||
| async function handleUserCreated(data: UserCreated['data']): Promise<boolean> { | ||
| try { | ||
| // Service logic: Send email | ||
| sendEmail(data.email) | ||
| return true | ||
| } catch (error) { | ||
| console.error('Failed to send email:', error) | ||
| return false | ||
| } | ||
| } | ||
|
|
||
| async function sendEmail(email: string) { | ||
| console.warn('Sending email to', email) | ||
|
|
||
| // Publish Event for other services | ||
| repository.publish(Events.EmailSent, { | ||
| email, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Connect first; await consume/publish; await sendEmail in handler
- Missing repository.connect.
- Not awaiting consume may hide setup errors.
- Handler returns true before sendEmail/publish completes; failures won’t be caught and the message will be acked.
- Cast to EventHandlerMap stems from a non-partial map type.
Apply:
-import type { EventHandlerMap, UserCreated } from '../repository/types'
+import type { EventHandlerMap, UserCreated } from '../repository/types'
import { repository } from '../repository'
import { Events } from '../repository/types'
// Service 2: Email Service
-
-// Consume to Events
-repository.consume(repository.email.name, {
- userCreated: handleUserCreated,
-} as EventHandlerMap)
+// Connect and start consumer
+await repository.connect(process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672')
+await repository.consume(repository.email.name, {
+ userCreated: handleUserCreated,
+} as EventHandlerMap)
@@
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
try {
// Service logic: Send email
- sendEmail(data.email)
+ await sendEmail(data.email)
return true
} catch (error) {
console.error('Failed to send email:', error)
return false
}
}
@@
async function sendEmail(email: string) {
console.warn('Sending email to', email)
// Publish Event for other services
- repository.publish(Events.EmailSent, {
+ await repository.publish(Events.EmailSent, {
email,
})
}If you adopt Partial handler maps in the repository (see other comment), remove the cast: { userCreated: handleUserCreated }.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import type { EventHandlerMap, UserCreated } from '../repository/types' | |
| import { repository } from '../repository' | |
| import { Events } from '../repository/types' | |
| // Service 2: Email Service | |
| // Consume to Events | |
| repository.consume(repository.email.name, { | |
| userCreated: handleUserCreated, | |
| } as EventHandlerMap) | |
| // Business logic | |
| async function handleUserCreated(data: UserCreated['data']): Promise<boolean> { | |
| try { | |
| // Service logic: Send email | |
| sendEmail(data.email) | |
| return true | |
| } catch (error) { | |
| console.error('Failed to send email:', error) | |
| return false | |
| } | |
| } | |
| async function sendEmail(email: string) { | |
| console.warn('Sending email to', email) | |
| // Publish Event for other services | |
| repository.publish(Events.EmailSent, { | |
| email, | |
| }) | |
| } | |
| import type { EventHandlerMap, UserCreated } from '../repository/types' | |
| import { repository } from '../repository' | |
| import { Events } from '../repository/types' | |
| // Service 2: Email Service | |
| // Connect and start consumer | |
| await repository.connect(process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672') | |
| await repository.consume(repository.email.name, { | |
| userCreated: handleUserCreated, | |
| } as EventHandlerMap) | |
| // Business logic | |
| async function handleUserCreated(data: UserCreated['data']): Promise<boolean> { | |
| try { | |
| // Service logic: Send email | |
| await sendEmail(data.email) | |
| return true | |
| } catch (error) { | |
| console.error('Failed to send email:', error) | |
| return false | |
| } | |
| } | |
| async function sendEmail(email: string) { | |
| console.warn('Sending email to', email) | |
| // Publish Event for other services | |
| await repository.publish(Events.EmailSent, { | |
| email, | |
| }) | |
| } |
🤖 Prompt for AI Agents
In examples/microservices/service2/index.ts around lines 1 to 31, the service
never connects to the repository, calls consume without awaiting, and the
handler returns before async work completes; fix by awaiting
repository.connect() at startup, await repository.consume(repository.email.name,
{ userCreated: handleUserCreated }) instead of un-awaited consume so setup
errors surface, change handleUserCreated to await sendEmail(data.email) and
await repository.publish(Events.EmailSent, { email }) inside sendEmail (or
return the publish promise) so failures are propagated and the handler only
returns true after those complete, and remove the EventHandlerMap cast if the
repository now accepts partial handler maps.
|
@coderabbitai can you rewrite README.md for new example |
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
examples/microservices/service1/service-start.ts(1 hunks)examples/microservices/service2/index.ts(1 hunks)examples/microservices/service2/service-start.ts(1 hunks)packages/queue/src/types.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/queue/src/types.ts
🧰 Additional context used
🧬 Code graph analysis (3)
examples/microservices/service1/service-start.ts (1)
examples/microservices/repository/index.ts (1)
repository(15-15)
examples/microservices/service2/index.ts (2)
examples/microservices/repository/index.ts (1)
repository(15-15)examples/microservices/repository/types.ts (2)
EventHandlerMap(14-14)UserCreated(16-23)
examples/microservices/service2/service-start.ts (1)
examples/microservices/repository/index.ts (1)
repository(15-15)
🔇 Additional comments (3)
examples/microservices/service2/index.ts (3)
13-22: Nice fix on the async handling!The handler now correctly awaits
sendEmail, ensuring that the promise is resolved before returningtrue. This prevents premature message acknowledgment and ensures failures are properly caught.
24-31: Proper async handling in sendEmail.The
repository.publishcall is now properly awaited, ensuring that the event is published before the handler completes. This is the correct implementation.
8-10: Await repository.consume and refine handler map types
- Await
repository.consume(...)to surface setup errors; confirm it returns a Promise.- Remove the
EventHandlerMapcast and update the API or type to accept partial handler maps (e.g.,Partial<EventHandlerMap>).
| // On service start | ||
| import { repository } from '../repository' | ||
|
|
||
| // Connect to RabbitMQ | ||
| repository.connect('amqp://guest:guest@localhost:5672') |
There was a problem hiding this comment.
Await the connection; use environment variables for credentials.
Critical issues:
repository.connectis not awaited. If connect is async, errors and setup failures will be silent.- Hardcoded credentials (
guest:guest) should be externalized to environment variables. - No error handling if connection fails.
Apply this diff:
-// On service start
-import { repository } from '../repository'
-
-// Connect to RabbitMQ
-repository.connect('amqp://guest:guest@localhost:5672')
+// On service start
+import { repository } from '../repository'
+
+// Connect to RabbitMQ
+const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
+await repository.connect(rabbitUrl).catch((error) => {
+ console.error('Failed to connect to RabbitMQ:', error)
+ process.exit(1)
+})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // On service start | |
| import { repository } from '../repository' | |
| // Connect to RabbitMQ | |
| repository.connect('amqp://guest:guest@localhost:5672') | |
| // On service start | |
| import { repository } from '../repository' | |
| // Connect to RabbitMQ | |
| const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672' | |
| await repository.connect(rabbitUrl).catch((error) => { | |
| console.error('Failed to connect to RabbitMQ:', error) | |
| process.exit(1) | |
| }) |
🤖 Prompt for AI Agents
In examples/microservices/service1/service-start.ts around lines 1 to 5, the
call to repository.connect is not awaited, uses hardcoded credentials, and lacks
error handling; change to await repository.connect(...) using RabbitMQ URL
assembled from environment variables (e.g. host, port, user, password or a
single RABBITMQ_URL), wrap the await in a try/catch, log the error on failure
and exit the process with a non-zero code so startup failures are not silent.
| // On service start | ||
| import { repository } from '../repository' | ||
|
|
||
| // Connect to RabbitMQ | ||
| repository.connect('amqp://guest:guest@localhost:5672') |
There was a problem hiding this comment.
Await the connection; use environment variables; eliminate duplication.
This file is identical to service1/service-start.ts and has the same critical issues:
repository.connectis not awaited.- Hardcoded credentials (
guest:guest) should be externalized to environment variables. - No error handling if connection fails.
Additionally, consider extracting this shared startup logic to a common utility function to eliminate duplication.
Apply the same fix as in service1/service-start.ts:
-// On service start
-import { repository } from '../repository'
-
-// Connect to RabbitMQ
-repository.connect('amqp://guest:guest@localhost:5672')
+// On service start
+import { repository } from '../repository'
+
+// Connect to RabbitMQ
+const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
+await repository.connect(rabbitUrl).catch((error) => {
+ console.error('Failed to connect to RabbitMQ:', error)
+ process.exit(1)
+})Then, consider creating a shared startService helper in examples/microservices/repository/utils.ts:
export async function startService(repository: QueueRepository): Promise<void> {
const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
await repository.connect(rabbitUrl).catch((error) => {
console.error('Failed to connect to RabbitMQ:', error)
process.exit(1)
})
}🤖 Prompt for AI Agents
In examples/microservices/service2/service-start.ts lines 1-5, the startup code
mirrors service1 and has three issues: repository.connect is not awaited, the
RabbitMQ URL and credentials are hardcoded, and there is no error handling;
update the file to await repository.connect, read the connection URL from an
environment variable (e.g. process.env.RABBIT_URL with a fallback), and add
.catch or try/catch to log the error and exit on failure; after applying the
same fix as service1, consider moving this logic into a shared helper
(examples/microservices/repository/utils.ts) as startService(repository) that
builds the URL from env, awaits repository.connect, and handles errors by
logging and process.exit(1) to remove duplication.
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 2573 @hmbanan666: Here's the rewritten README.md for the new microservices example: # Queue 🐰
[](https://www.npmjs.com/package/@nextorders/queue)
The `@nextorders/queue` is a TypeScript library designed to simplify working with RabbitMQ in Node.js applications. The library provides a high-level API for organizing message queues and processing business entities in event-driven microservices.

## Key Features
- Type-safe operations with message queues
- Automatic connection to the RabbitMQ server
- Declarative creation of queues and exchanges
- Built-in error handling and retry mechanisms
- Event-driven architecture support
- Simplified publish/consume API
- Flexible connection configuration
## Installation
You can install the library via npm:
```bash
npm install @nextorders/queueUsage1. Define Event TypesCreate type definitions for your events: import type { BaseEventMessage } from '@nextorders/queue'
export enum Events {
UserCreated = 'userCreated',
EmailSent = 'emailSent',
}
export type EventMessage = UserCreated | EmailSent
export interface UserCreated extends BaseEventMessage {
event: typeof Events.UserCreated
data: {
id: string
name: string
email: string
}
}
export interface EmailSent extends BaseEventMessage {
event: typeof Events.EmailSent
data: {
email: string
}
}2. Create EntitiesDefine entities that represent your services: import { Entity, Repository } from '@nextorders/queue'
import { Events } from './types'
export class User extends Entity {
constructor(repository: Repository) {
super({
name: 'user',
eventsToConsume: [],
repository,
})
}
}
export class Email extends Entity {
constructor(repository: Repository) {
super({
name: 'email',
eventsToConsume: [Events.UserCreated],
repository,
})
}
}3. Create RepositoryCreate a repository that manages your entities: import { Repository } from '@nextorders/queue'
import type { EventMessage } from './types'
import { User, Email } from './entities'
class QueueRepository extends Repository {
user: User = new User(this)
email: Email = new Email(this)
// Override publish method with proper typing
publish<T extends EventMessage>(
event: T['event'],
data: T['data']
): Promise<void> {
return super.publish(event, data)
}
}
export const repository = new QueueRepository()4. Connect to RabbitMQOn service start, connect to your RabbitMQ instance: import { repository } from './repository'
await repository.connect('amqp://guest:guest@localhost:5672')5. Publish Events (Service 1 - User Service)Create and publish events from your services: import { repository } from './repository'
import { Events } from './types'
// Create a new user
const userPayload = {
name: 'John Doe',
email: 'john@example.com',
}
const newUser = {
id: crypto.randomUUID(),
...userPayload,
}
// Publish UserCreated event
await repository.publish(Events.UserCreated, {
id: newUser.id,
name: newUser.name,
email: newUser.email,
})6. Consume Events (Service 2 - Email Service)Subscribe to events and handle them: import { repository } from './repository'
import { Events } from './types'
import type { UserCreated } from './types'
// Define event handlers
const handleUserCreated = async (data: UserCreated['data']): Promise<boolean> => {
try {
await sendEmail(data.email)
return true
} catch (error) {
console.error('Error handling UserCreated event:', error)
return false
}
}
const sendEmail = async (email: string): Promise<void> => {
console.warn('Sending email to:', email)
// Publish EmailSent event
await repository.publish(Events.EmailSent, { email })
}
// Subscribe to events
await repository.consume(repository.email.name, {
[Events.UserCreated]: handleUserCreated,
})Example: Microservices ArchitectureThe library excels in microservices architectures where services communicate through events: Check out the examples/microservices directory for a complete working example with:
API ReferenceRepository Methods
Entity Classclass Entity {
name: string
eventsToConsume: string[]
repository: Repository
constructor(config: {
name: string
eventsToConsume: string[]
repository: Repository
})
}LicenseThis project is licensed under the MIT License. |
Summary by CodeRabbit
New Features
Improvements
Refactor
Documentation