-
Notifications
You must be signed in to change notification settings - Fork 0
feat(m2): BullMQ Ride Queue & Worker App #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # UrbanPulse Backend – Copilot Instructions | ||
|
|
||
| ## Architecture | ||
|
|
||
| Turborepo + pnpm monorepo for a ride-sharing backend. Three workspace packages: | ||
|
|
||
| - **`apps/api-gateway`** – Express 5 HTTP server (port 3001). All REST endpoints live here: auth, user, rides. | ||
| - **`apps/ride-worker`** – Background worker for ride-related processing (BullMQ). Consumes shared types and Prisma client from `packages/common`. | ||
| - **`packages/common`** – Shared Prisma client, Zod validation schemas, and type exports. Consumed via `"common": "workspace:^"`. | ||
|
|
||
| Data flows: **Route → validate middleware (Zod) → authenticate middleware (JWT) → service → Prisma/raw SQL → PostgreSQL+PostGIS**. | ||
|
|
||
| ## Key Conventions | ||
|
|
||
| ### Module System | ||
|
|
||
| ESM-only (`"type": "module"` everywhere). All local imports **must** use `.js` extensions (e.g., `import prisma from '../utils/db.js'`). TypeScript compiles with `"module": "nodenext"`. | ||
|
|
||
| ### Validation Pattern | ||
|
|
||
| Zod schemas in `packages/common/schemas/` wrap `body`, `query`, `params` keys. The `validate()` middleware in `apps/api-gateway/src/middleware/validate.ts` calls `schema.parseAsync({ body, query, params })`. When adding a new schema, follow this structure: | ||
|
|
||
| ```ts | ||
| export const mySchema = z.object({ | ||
| body: z.object({ | ||
| /* fields */ | ||
| }), | ||
| }); | ||
| export type MyInput = z.infer<typeof mySchema>['body']; | ||
| ``` | ||
|
|
||
| Export from `packages/common/schemas/index.ts` and import in routes via `import { mySchema } from 'common'`. | ||
|
|
||
| ### API Response Format | ||
|
|
||
| All endpoints return `{ success: boolean, message: string, data?: ... }`. Errors include `errors` array for validation failures. Follow this in every route handler and service return type. | ||
|
|
||
| ### Auth & Authorization | ||
|
|
||
| JWT via `Bearer` token. Middleware chain: `authenticate` (verifies token, sets `req.user: JwtPayload`) → optional `authorize('driver' | 'rider')`. `JwtPayload` contains `{ userId, number, role }`. | ||
|
|
||
| ### PostGIS / Spatial Data | ||
|
|
||
| Prisma schema uses `Unsupported("geometry(Point,4326)")` for location fields. These **cannot** use standard Prisma CRUD – use `prisma.$queryRaw` with `ST_GeomFromText` / `ST_AsText` for spatial operations (see `ride.service.ts`). Coordinates are `[longitude, latitude]`. | ||
|
|
||
| ### Database | ||
|
|
||
| Single Prisma client instance in `apps/api-gateway/src/utils/db.ts` (cached on `globalThis` in dev). Schema lives at `packages/common/prisma/schema.prisma`. Models: `User` ↔ `Driver`/`Rider` (1:1) → `Trip`. | ||
|
|
||
| ### Logging | ||
|
|
||
| Pino logger (`apps/api-gateway/src/logger.ts`). Use `pino-pretty` in dev. Always use structured logging: `logger.info({ userId }, 'message')` not string interpolation. | ||
|
|
||
| ## Developer Workflow | ||
|
|
||
| ```bash | ||
| # Setup & run (Docker-based, includes PostGIS + Redis) | ||
| source activate.sh # loads aliases: dcu, dcd, dcr, dcl, dc | ||
| dcu # docker-compose up -d (postgres + redis + api-gateway + ride-worker) | ||
| dcl # tail logs | ||
|
|
||
| # Build & typecheck | ||
| pnpm build # turbo build (common first, then api-gateway and ride-worker) | ||
| pnpm typecheck # turbo typecheck | ||
|
|
||
| # DB migrations (run inside container or with DATABASE_URL set) | ||
| cd packages/common | ||
| npx prisma migrate dev --name <name> | ||
| npx prisma generate # auto-runs on postinstall | ||
| ``` | ||
|
|
||
| ## Adding a New Feature Checklist | ||
|
|
||
| 1. **Schema** – If new Zod validation is needed, add to `packages/common/schemas/`, export from `index.ts` | ||
| 2. **Prisma** – If new model/field, edit `packages/common/prisma/schema.prisma`, run `prisma migrate dev` | ||
| 3. **Service** – Add business logic in `apps/api-gateway/src/services/` with typed return interface | ||
| 4. **Route** – Add route file in `apps/api-gateway/src/routes/`, wire middleware chain (`authenticate`, `validate(schema)`) | ||
| 5. **Register** – Mount the router in `apps/api-gateway/src/routes.ts` | ||
| 6. **Rebuild common** – After changing `packages/common`, run `pnpm build` so `dist/` is updated for the gateway | ||
|
|
||
| ## Environment Variables | ||
|
|
||
| Passed via `.env` and Docker Compose. Key vars (see `turbo.json` passthrough): `DATABASE_URL`, `REDIS_HOST`, `REDIS_PORT`, `APP_PORT`, `JWT_SECRET`, `NODE_ENV`, `LOGGING_TOKEN`, `LOGGING_URL`. | ||
|
|
||
| ## File Naming | ||
|
|
||
| - Route files: `<domain>.routes.ts` (e.g., `ride.routes.ts`) | ||
| - Service files: `<domain>.service.ts` | ||
| - Schema files: `<domain>.schema.ts` | ||
| - All in lowercase, kebab-case for multi-word |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,4 +11,3 @@ tsconfig.tsbuildinfo | |
| docs/ | ||
| .vscode/ | ||
| .turbo/ | ||
| learn/ | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,7 +1,41 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { Queue } from 'bullmq'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { QUEUE_NAMES } from 'common'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import logger from '../logger.js'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import prisma from '../utils/db.js'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import type { RideInput } from 'common'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const bullmqConnection = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| host: process.env.REDIS_HOST || 'localhost', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| port: Number(process.env.REDIS_PORT) || 6379, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } as const; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const rideRequestsQueue = new Queue(QUEUE_NAMES.RIDE_REQUESTS, { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| connection: bullmqConnection, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let rideRequestsQueueShutdownHookRegistered = false; | |
| const registerRideRequestsQueueShutdown = (queue: Queue): void => { | |
| if (rideRequestsQueueShutdownHookRegistered) { | |
| return; | |
| } | |
| rideRequestsQueueShutdownHookRegistered = true; | |
| const shutdown = async (): Promise<void> => { | |
| try { | |
| await queue.close(); | |
| } catch (error) { | |
| logger.error(error, 'Error closing rideRequestsQueue during shutdown'); | |
| } | |
| }; | |
| process.once('beforeExit', () => { | |
| void shutdown(); | |
| }); | |
| const signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM', 'SIGQUIT']; | |
| signals.forEach((signal) => { | |
| process.once(signal, () => { | |
| void shutdown(); | |
| }); | |
| }); | |
| }; | |
| registerRideRequestsQueueShutdown(rideRequestsQueue); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| { | ||
| "name": "ride-worker", | ||
| "version": "1.0.0", | ||
| "description": "BullMQ worker for ride lifecycle processing", | ||
| "main": "dist/index.js", | ||
| "scripts": { | ||
| "build": "tsc -p tsconfig.json", | ||
| "dev": "tsx watch src/index.ts", | ||
| "start": "node ./dist/index.js" | ||
|
Comment on lines
+6
to
+9
|
||
| }, | ||
| "keywords": [], | ||
| "author": "", | ||
| "license": "ISC", | ||
| "type": "module", | ||
| "devDependencies": { | ||
| "@types/node": "^24.5.2", | ||
| "pino-pretty": "^13.1.2", | ||
| "tsx": "^4.20.6", | ||
| "typescript": "^5.9.2" | ||
| }, | ||
| "dependencies": { | ||
| "@prisma/client": "^6.17.1", | ||
| "bullmq": "^5.56.0", | ||
| "common": "workspace:^", | ||
| "dotenv": "^17.2.3", | ||
| "ioredis": "^5.8.1", | ||
| "pino": "^10.1.0" | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| import { QUEUE_NAMES } from 'common'; | ||
|
|
||
| export const REDIS_CONFIG = { | ||
| host: process.env.REDIS_HOST || 'localhost', | ||
| port: Number(process.env.REDIS_PORT) || 6379, | ||
| } as const; | ||
|
|
||
| export { QUEUE_NAMES }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| import 'dotenv/config'; | ||
| import logger from './logger.js'; | ||
| import { createRideRequestWorker } from './workers/ride-request.worker.js'; | ||
| import { createRideMatchingWorker } from './workers/ride-matching.worker.js'; | ||
| import { createRideLifecycleWorker } from './workers/ride-lifecycle.worker.js'; | ||
| import redis from './utils/redis.js'; | ||
|
|
||
| const rideRequestResult = createRideRequestWorker(); | ||
| const workers = [ | ||
| rideRequestResult.worker, | ||
| createRideMatchingWorker(), | ||
| createRideLifecycleWorker(), | ||
| ]; | ||
|
|
||
| // Queues created inside workers that must be closed on shutdown | ||
| const workerQueues = [rideRequestResult.queue]; | ||
|
|
||
| logger.info('ride-worker started — listening on all queues'); | ||
|
|
||
| const shutdown = async (signal: string) => { | ||
| logger.info({ signal }, 'Shutting down ride-worker...'); | ||
|
|
||
| try { | ||
| await Promise.all(workers.map((w) => w.close())); | ||
| await Promise.all(workerQueues.map((q) => q.close())); | ||
| await redis.quit(); | ||
|
|
||
| logger.info('ride-worker shutdown complete'); | ||
| process.exit(0); | ||
| } catch (err) { | ||
| logger.error({ err, signal }, 'Error during ride-worker shutdown'); | ||
| process.exit(1); | ||
| } | ||
| }; | ||
|
|
||
| process.on('SIGTERM', () => shutdown('SIGTERM')); | ||
| process.on('SIGINT', () => shutdown('SIGINT')); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| import pino from 'pino'; | ||
|
|
||
| const isDevelopment = process.env.NODE_ENV !== 'production'; | ||
|
|
||
| const logger = pino({ | ||
| level: process.env.LOG_LEVEL || 'info', | ||
| ...(isDevelopment && { | ||
| transport: { | ||
| target: 'pino-pretty', | ||
| options: { | ||
| colorize: true, | ||
| translateTime: 'SYS:standard', | ||
| ignore: 'pid,hostname', | ||
| }, | ||
| }, | ||
| }), | ||
| }); | ||
|
|
||
| export default logger; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| import { PrismaClient } from '@prisma/client'; | ||
|
|
||
| declare global { | ||
| var prisma: PrismaClient | undefined; | ||
| } | ||
|
|
||
| const prisma = | ||
| global.prisma || | ||
| new PrismaClient({ | ||
| log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'], | ||
| }); | ||
|
|
||
| if (process.env.NODE_ENV !== 'production') { | ||
| global.prisma = prisma; | ||
| } | ||
|
|
||
| export default prisma; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| import { Redis } from 'ioredis'; | ||
| import logger from '../logger.js'; | ||
|
|
||
| declare global { | ||
| var redis: Redis | undefined; | ||
| } | ||
|
|
||
| const createRedisClient = (): Redis => { | ||
| const client = new Redis({ | ||
| host: process.env.REDIS_HOST || 'localhost', | ||
| port: Number(process.env.REDIS_PORT) || 6379, | ||
| maxRetriesPerRequest: null, // required by BullMQ | ||
| retryStrategy(times: number) { | ||
| const delay = Math.min(times * 100, 3000); | ||
| logger.warn({ attempt: times, delayMs: delay }, 'Redis reconnecting'); | ||
| return delay; | ||
| }, | ||
| }); | ||
|
Comment on lines
+8
to
+18
|
||
|
|
||
| client.on('connect', () => { | ||
| logger.info('Redis connected'); | ||
| }); | ||
|
|
||
| client.on('error', (err: Error) => { | ||
| logger.error({ err }, 'Redis error'); | ||
| }); | ||
|
|
||
| client.on('close', () => { | ||
| logger.warn('Redis connection closed'); | ||
| }); | ||
|
|
||
| return client; | ||
| }; | ||
|
|
||
| const redis: Redis = global.redis ?? createRedisClient(); | ||
|
|
||
| if (process.env.NODE_ENV !== 'production') { | ||
| global.redis = redis; | ||
| } | ||
|
|
||
| export default redis; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
bullmqConnectionobject is duplicated here (lines 7-10) creating the same configuration multiple times. For consistency with the ride-worker, consider extracting this to a shared utility or using the REDIS_CONFIG pattern. However, if this duplication is intentional to keep api-gateway and ride-worker independent, this is acceptable.