Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 107 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The `@nextorders/queue` is a TypeScript library designed to simplify working wit

![RabbitMQ Dashboard](https://github.com/user-attachments/assets/b330e911-bc74-404d-9999-c720b04ca9f7)

## Key Features
## 😨 Key Features

- Type-safe operations with message queues
- Automatic connection to the RabbitMQ server
Expand All @@ -15,97 +15,165 @@ The `@nextorders/queue` is a TypeScript library designed to simplify working wit
- Support for various message types
- Flexible connection configuration

## Installation
## 📦 Installation

You can install the library via npm:

```bash
npm install @nextorders/queue
```

## Usage
## 🚀 Usage

Prepare types:
### 1. Define Event Types

Create type definitions for your events:

```typescript
import type { BaseEventMessage } from '@nextorders/queue'

export enum Events {
TICKET_MESSAGE_CREATED = 'ticketMessageCreated',
OTHER_ACTION = 'otherAction',
UserCreated = 'userCreated',
EmailSent = 'emailSent',
}

export type EventMessage = BaseEventMessage<Events>
export type EventMessage = UserCreated | EmailSent

export interface UserCreated extends BaseEventMessage {
event: typeof Events.UserCreated
data: {
id: string
name: string
email: string
}
}

export interface TicketMessageCreated extends EventMessage {
type: typeof Events.TICKET_MESSAGE_CREATED
export interface EmailSent extends BaseEventMessage {
event: typeof Events.EmailSent
data: {
ticketId: string
ticketOwnerId: string
messageId: string
userId: string
userName: string
userSurname: string | undefined
userText: string
email: string
}
}
```

Create Entities:
### 2. Create Entities

Define entities that represent your services:

```typescript
import { Entity, Repository } from '@nextorders/queue'
import { Events } from './types'

export class User extends Entity {
constructor(repository: Repository) {
super({
name: 'user',
eventsToConsume: [],
repository,
})
}
}

export class Telegram extends Entity {
export class Email extends Entity {
constructor(repository: Repository) {
super({
name: 'telegram',
eventsToConsume: ['ticketMessageCreated'],
name: 'email',
eventsToConsume: [Events.UserCreated],
repository,
})
}
}
```

Create Repository with Entities:
### 3. Create Repository

Create a repository that manages your entities:

```typescript
import type { EventMessage } from './types'
import { Repository } from '@nextorders/queue'
import { Telegram, Ticket } from './entities'
import { Email, User } from './entities'

class QueueRepository extends Repository {
telegram: Telegram = new Telegram(this)
ticket: Ticket = new Ticket(this)
user: User = new User(this)
email: Email = new Email(this)

// Override publish method with proper typing
publish<T extends EventMessage>(
event: T['event'],
data: T['data']
): Promise<void> {
return super.publish(event, data)
}
}

export const repository = new QueueRepository()
```

On server start (for example in Nuxt server plugin):
### 4. Connect to RabbitMQ

On service start, connect to your RabbitMQ instance:

```typescript
await repository.connect(process.env.QUEUE_URL)
import { repository } from './repository'

await repository.connect('amqp://guest:guest@localhost:5672')
```

### 5. Publish Events

Create and publish events from your services:

```typescript
await repository.publish(Events.UserCreated, {
id: newUser.id,
name: newUser.name,
email: newUser.email,
})
```

And thats it! Use repository in app:
### 6. Consume Events

Subscribe to events and handle them:

```typescript
// Publisher
await repository.publisher.send({
exchange: repository.exchanges.events.exchange,
routingKey: Events.TICKET_MESSAGE_CREATED,
}, body)

// Consumer
await repository.telegram.consume(async (msg) => {
if (msg.type === 'ticketMessageCreated') {
return handleTicketMessageCreated(msg) // ack on finish
import type { UserCreated } from './types'
import { repository } from './repository'
import { Events } from './types'

// Define event handlers
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
try {
await sendEmail(data.email)
return true
} catch (error) {
console.error('Error handling UserCreated event:', error)
return false
}
}

async function sendEmail(email: string): Promise<void> {
console.warn('Sending email to:', email)

// Publish EmailSent event
await repository.publish(Events.EmailSent, { email })
}

return queue.ignore()
// Subscribe to events
await repository.consume(repository.email.name, {
[Events.UserCreated]: handleUserCreated,
})
```

## License
## 💁‍♂️ Example: Microservices Architecture

Check out the examples/microservices directory for a complete working example with:

- Service 1: User creation service
- Service 2: Email notification service
- Shared repository with entities
- Type-safe event definitions

## 🤝 License

This project is licensed under the MIT License.
15 changes: 15 additions & 0 deletions examples/microservices/repository/entities/email.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { Repository } from '@nextorders/queue'
import { Entity } from '@nextorders/queue'
import { Events } from '../types'

export class Email extends Entity {
constructor(repository: Repository) {
super({
name: 'email',
eventsToConsume: [
Events.UserCreated,
],
repository,
})
}
}
12 changes: 12 additions & 0 deletions examples/microservices/repository/entities/user.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Repository } from '@nextorders/queue'
import { Entity } from '@nextorders/queue'

export class User extends Entity {
constructor(repository: Repository) {
super({
name: 'user',
eventsToConsume: [],
repository,
})
}
}
15 changes: 15 additions & 0 deletions examples/microservices/repository/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { EventMessage } from './types'
import { Repository } from '@nextorders/queue'
import { Email } from './entities/email'
import { User } from './entities/user'

class QueueRepository extends Repository {
user = new User(this)
email = new Email(this)

async publish<T extends EventMessage>(event: T['event'], data: T['data']) {
return super.publish(event, data)
}
}

export const repository = new QueueRepository()
30 changes: 30 additions & 0 deletions examples/microservices/repository/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { BaseEventMessage, Status } from '@nextorders/queue'

// All possible events
export enum Events {
UserCreated = 'userCreated',
EmailSent = 'emailSent',
}

export type EventMessage = UserCreated | EmailSent

export type EventHandler = (msg: EventMessage) => Promise<Status>
export type EventMessageHandler<T = EventMessage['data']> = (data: T) => Promise<boolean>

export type EventHandlerMap = Record<EventMessage['event'], EventMessageHandler>

export interface UserCreated extends BaseEventMessage {
event: typeof Events.UserCreated
data: {
id: string
name: string
email: string
}
}

export interface EmailSent extends BaseEventMessage {
event: typeof Events.EmailSent
data: {
email: string
}
}
21 changes: 21 additions & 0 deletions examples/microservices/service1/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { repository } from '../repository'
import { Events } from '../repository/types'

// Service 1: User Service
const body = {
name: 'John Doe',
email: '5Tt9o@example.com',
}

// DB: Save user in database
const newUser = {
id: '123',
...body,
}

// Publish Event for other services
repository.publish(Events.UserCreated, {
id: newUser.id,
name: newUser.name,
email: newUser.email,
})
Comment on lines +1 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Connect before publish and await the Promise

Currently publish is fired without a connection and not awaited. This will throw (“Connection is not created”) and/or drop errors.

Apply:

+// Initialize connection once at startup
+await repository.connect(process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672')
@@
-// Publish Event for other services
-repository.publish(Events.UserCreated, {
+// Publish Event for other services
+await repository.publish(Events.UserCreated, {
   id: newUser.id,
   name: newUser.name,
   email: newUser.email,
 })

If top-level await isn’t allowed, wrap in an async main and call main().catch(console.error).

🤖 Prompt for AI Agents
In examples/microservices/service1/index.ts around lines 1 to 21, the code calls
repository.publish without first establishing a connection and without awaiting
the returned Promise; update the flow to first await repository.connect() (or
the appropriate connection/init method on repository), then await
repository.publish(...) so the connection exists and errors are propagated; if
top-level await is not supported, wrap the connect/publish sequence in an async
function (e.g., async main() { await repository.connect(); await
repository.publish(...); }) and call main().catch(console.error).

5 changes: 5 additions & 0 deletions examples/microservices/service1/service-start.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// On service start
import { repository } from '../repository'

// Connect to RabbitMQ
repository.connect('amqp://guest:guest@localhost:5672')
Comment on lines +1 to +5
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Await the connection; use environment variables for credentials.

Critical issues:

  • repository.connect is not awaited. If connect is async, errors and setup failures will be silent.
  • Hardcoded credentials (guest:guest) should be externalized to environment variables.
  • No error handling if connection fails.

Apply this diff:

-// On service start
-import { repository } from '../repository'
-
-// Connect to RabbitMQ
-repository.connect('amqp://guest:guest@localhost:5672')
+// On service start
+import { repository } from '../repository'
+
+// Connect to RabbitMQ
+const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
+await repository.connect(rabbitUrl).catch((error) => {
+  console.error('Failed to connect to RabbitMQ:', error)
+  process.exit(1)
+})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// On service start
import { repository } from '../repository'
// Connect to RabbitMQ
repository.connect('amqp://guest:guest@localhost:5672')
// On service start
import { repository } from '../repository'
// Connect to RabbitMQ
const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
await repository.connect(rabbitUrl).catch((error) => {
console.error('Failed to connect to RabbitMQ:', error)
process.exit(1)
})
🤖 Prompt for AI Agents
In examples/microservices/service1/service-start.ts around lines 1 to 5, the
call to repository.connect is not awaited, uses hardcoded credentials, and lacks
error handling; change to await repository.connect(...) using RabbitMQ URL
assembled from environment variables (e.g. host, port, user, password or a
single RABBITMQ_URL), wrap the await in a try/catch, log the error on failure
and exit the process with a non-zero code so startup failures are not silent.

31 changes: 31 additions & 0 deletions examples/microservices/service2/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import type { EventHandlerMap, UserCreated } from '../repository/types'
import { repository } from '../repository'
import { Events } from '../repository/types'

// Service 2: Email Service

// Consume to Events
repository.consume(repository.email.name, {
[Events.UserCreated]: handleUserCreated,
} as EventHandlerMap)

// Business logic
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
try {
// Service logic: Send email
await sendEmail(data.email)
return true
} catch (error) {
console.error('Failed to send email:', error)
return false
}
}

async function sendEmail(email: string) {
console.warn('Sending email to', email)

// Publish Event for other services
await repository.publish(Events.EmailSent, {
email,
})
}
Comment on lines +1 to +31
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Connect first; await consume/publish; await sendEmail in handler

  • Missing repository.connect.
  • Not awaiting consume may hide setup errors.
  • Handler returns true before sendEmail/publish completes; failures won’t be caught and the message will be acked.
  • Cast to EventHandlerMap stems from a non-partial map type.

Apply:

-import type { EventHandlerMap, UserCreated } from '../repository/types'
+import type { EventHandlerMap, UserCreated } from '../repository/types'
 import { repository } from '../repository'
 import { Events } from '../repository/types'
 
 // Service 2: Email Service
-
-// Consume to Events
-repository.consume(repository.email.name, {
-  userCreated: handleUserCreated,
-} as EventHandlerMap)
+// Connect and start consumer
+await repository.connect(process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672')
+await repository.consume(repository.email.name, {
+  userCreated: handleUserCreated,
+} as EventHandlerMap)
@@
 async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
   try {
     // Service logic: Send email
-    sendEmail(data.email)
+    await sendEmail(data.email)
     return true
   } catch (error) {
     console.error('Failed to send email:', error)
     return false
   }
 }
@@
 async function sendEmail(email: string) {
   console.warn('Sending email to', email)
 
   // Publish Event for other services
-  repository.publish(Events.EmailSent, {
+  await repository.publish(Events.EmailSent, {
     email,
   })
 }

If you adopt Partial handler maps in the repository (see other comment), remove the cast: { userCreated: handleUserCreated }.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import type { EventHandlerMap, UserCreated } from '../repository/types'
import { repository } from '../repository'
import { Events } from '../repository/types'
// Service 2: Email Service
// Consume to Events
repository.consume(repository.email.name, {
userCreated: handleUserCreated,
} as EventHandlerMap)
// Business logic
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
try {
// Service logic: Send email
sendEmail(data.email)
return true
} catch (error) {
console.error('Failed to send email:', error)
return false
}
}
async function sendEmail(email: string) {
console.warn('Sending email to', email)
// Publish Event for other services
repository.publish(Events.EmailSent, {
email,
})
}
import type { EventHandlerMap, UserCreated } from '../repository/types'
import { repository } from '../repository'
import { Events } from '../repository/types'
// Service 2: Email Service
// Connect and start consumer
await repository.connect(process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672')
await repository.consume(repository.email.name, {
userCreated: handleUserCreated,
} as EventHandlerMap)
// Business logic
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
try {
// Service logic: Send email
await sendEmail(data.email)
return true
} catch (error) {
console.error('Failed to send email:', error)
return false
}
}
async function sendEmail(email: string) {
console.warn('Sending email to', email)
// Publish Event for other services
await repository.publish(Events.EmailSent, {
email,
})
}
🤖 Prompt for AI Agents
In examples/microservices/service2/index.ts around lines 1 to 31, the service
never connects to the repository, calls consume without awaiting, and the
handler returns before async work completes; fix by awaiting
repository.connect() at startup, await repository.consume(repository.email.name,
{ userCreated: handleUserCreated }) instead of un-awaited consume so setup
errors surface, change handleUserCreated to await sendEmail(data.email) and
await repository.publish(Events.EmailSent, { email }) inside sendEmail (or
return the publish promise) so failures are propagated and the handler only
returns true after those complete, and remove the EventHandlerMap cast if the
repository now accepts partial handler maps.

5 changes: 5 additions & 0 deletions examples/microservices/service2/service-start.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// On service start
import { repository } from '../repository'

// Connect to RabbitMQ
repository.connect('amqp://guest:guest@localhost:5672')
Comment on lines +1 to +5
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Await the connection; use environment variables; eliminate duplication.

This file is identical to service1/service-start.ts and has the same critical issues:

  • repository.connect is not awaited.
  • Hardcoded credentials (guest:guest) should be externalized to environment variables.
  • No error handling if connection fails.

Additionally, consider extracting this shared startup logic to a common utility function to eliminate duplication.

Apply the same fix as in service1/service-start.ts:

-// On service start
-import { repository } from '../repository'
-
-// Connect to RabbitMQ
-repository.connect('amqp://guest:guest@localhost:5672')
+// On service start
+import { repository } from '../repository'
+
+// Connect to RabbitMQ
+const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
+await repository.connect(rabbitUrl).catch((error) => {
+  console.error('Failed to connect to RabbitMQ:', error)
+  process.exit(1)
+})

Then, consider creating a shared startService helper in examples/microservices/repository/utils.ts:

export async function startService(repository: QueueRepository): Promise<void> {
  const rabbitUrl = process.env.RABBIT_URL ?? 'amqp://guest:guest@localhost:5672'
  await repository.connect(rabbitUrl).catch((error) => {
    console.error('Failed to connect to RabbitMQ:', error)
    process.exit(1)
  })
}
🤖 Prompt for AI Agents
In examples/microservices/service2/service-start.ts lines 1-5, the startup code
mirrors service1 and has three issues: repository.connect is not awaited, the
RabbitMQ URL and credentials are hardcoded, and there is no error handling;
update the file to await repository.connect, read the connection URL from an
environment variable (e.g. process.env.RABBIT_URL with a fallback), and add
.catch or try/catch to log the error and exit on failure; after applying the
same fix as service1, consider moving this logic into a shared helper
(examples/microservices/repository/utils.ts) as startService(repository) that
builds the URL from env, awaits repository.connect, and handles errors by
logging and process.exit(1) to remove duplication.

Loading