feat(m2): BullMQ Ride Queue & Worker App#14
Conversation
- Create apps/ride-worker workspace with ESM TypeScript config - Add src/index.ts entry point with graceful SIGTERM/SIGINT shutdown - Add pino logger, config (REDIS_CONFIG, QUEUE_NAMES re-export), utils/redis, utils/db - Implement ride-request.worker.ts: processes new ride jobs, publishes to ride-matching queue - Implement ride-matching.worker.ts: skeleton processor (fully implemented in M4) - Implement ride-lifecycle.worker.ts: skeleton handling ACCEPT/VERIFY_OTP/START/COMPLETE/CANCEL actions - Create packages/common/queues/index.ts with QUEUE_NAMES constants (RIDE_REQUESTS, RIDE_MATCHING, RIDE_LIFECYCLE) - Export queue constants from packages/common/index.ts - Add bullmq dependency to api-gateway - Refactor POST /rides/create: after DB insert, publish job to ride-requests queue - Add connection options pattern for BullMQ (avoids ioredis type mismatch) - Update compose.yml: add ride-worker service - Update compose.override.yml: add ride-worker dev override with bind mount - Update root tsconfig.json: add ride-worker project reference - Update ride.service.test.ts: mock bullmq Queue constructor, assert queue.add() is called with correct job payload (tripId, riderId, coords) All 34 tests pass.
- Remove learn/ from .gitignore so learning docs are versioned - Add learn/m1-redis-geo.md (retroactively tracked) - Add learn/m2-bullmq-workers.md: covers BullMQ Queue/Worker concepts, connection options pattern, multi-queue architecture, delayed jobs, graceful shutdown, and mocking BullMQ in Vitest
There was a problem hiding this comment.
Pull request overview
This PR implements a BullMQ-based queue system for asynchronous ride processing by creating a dedicated worker application and refactoring the ride creation flow. The ride-worker app runs as a separate process, enabling horizontal scaling of job processing independently from the API gateway.
Changes:
- Created new
apps/ride-workerworkspace with three BullMQ worker processors for ride-requests, ride-matching, and ride-lifecycle queues - Added shared queue constants to
packages/common/queues/for consistent queue naming across services - Refactored
ride.service.tsto publish jobs to BullMQ instead of synchronous processing, with updated tests
Reviewed changes
Copilot reviewed 22 out of 25 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| tsconfig.json | Added ride-worker project reference |
| pnpm-lock.yaml | Added BullMQ and related dependencies for both api-gateway and ride-worker |
| packages/common/queues/index.ts | New shared queue name constants (RIDE_REQUESTS, RIDE_MATCHING, RIDE_LIFECYCLE) |
| packages/common/package.json | Added exports configuration for queues subpath |
| packages/common/index.ts | Exported queue constants |
| compose.yml | Added ride-worker service configuration |
| compose.override.yml | Added ride-worker dev override with bind mount |
| apps/ride-worker/tsconfig.json | TypeScript configuration matching api-gateway conventions |
| apps/ride-worker/package.json | Package definition with ESM, BullMQ, and Prisma dependencies |
| apps/ride-worker/src/index.ts | Worker entry point with graceful shutdown handlers |
| apps/ride-worker/src/logger.ts | Pino logger matching api-gateway configuration |
| apps/ride-worker/src/config.ts | Redis config and queue name exports |
| apps/ride-worker/src/utils/redis.ts | ioredis singleton with maxRetriesPerRequest: null for BullMQ |
| apps/ride-worker/src/utils/db.ts | Prisma client singleton matching api-gateway pattern |
| apps/ride-worker/src/workers/ride-request.worker.ts | Processes new ride jobs and publishes to matching queue |
| apps/ride-worker/src/workers/ride-matching.worker.ts | Skeleton processor for driver matching (M4) |
| apps/ride-worker/src/workers/ride-lifecycle.worker.ts | Skeleton processor for ride state transitions (M4/M5) |
| apps/api-gateway/src/services/ride.service.ts | Publishes ride jobs to BullMQ queue after DB insert |
| apps/api-gateway/src/services/tests/ride.service.test.ts | Added BullMQ mock and queue assertions |
| apps/api-gateway/package.json | Added BullMQ dependency |
| .github/copilot-instructions.md | New Copilot configuration file for project conventions |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const bullmqConnection = { | ||
| host: process.env.REDIS_HOST || 'localhost', | ||
| port: Number(process.env.REDIS_PORT) || 6379, | ||
| } as const; |
There was a problem hiding this comment.
The bullmqConnection object is duplicated here (lines 5-8) when it should use the REDIS_CONFIG constant from config.ts. Replace this with import { REDIS_CONFIG } from '../config.js' and use connection: REDIS_CONFIG at line 55.
| const { tripId, riderId, pickupLng, pickupLat } = job.data; | ||
|
|
||
| logger.info( | ||
| { tripId, riderId, pickupLng, pickupLat }, | ||
| 'Processing ride request job', | ||
| ); | ||
|
|
||
| // Publish a ride-matching job to initiate driver matching (fully implemented in M4) | ||
| await rideMatchingQueue.add( | ||
| 'match-driver', | ||
| { tripId, riderId, pickupLng, pickupLat }, |
There was a problem hiding this comment.
The job data passed to the ride-matching queue is missing dropoffLng and dropoffLat fields (line 35). The ride-request job receives both pickup and dropoff coordinates (see RideRequestJobData interface lines 11-18), but only passes pickup coordinates to the matching worker. This will likely cause issues in M4 when the matching algorithm needs to calculate optimal driver-rider pairings based on both pickup and dropoff locations. Add dropoffLng: job.data.dropoffLng, dropoffLat: job.data.dropoffLat to the job data.
| const { tripId, riderId, pickupLng, pickupLat } = job.data; | |
| logger.info( | |
| { tripId, riderId, pickupLng, pickupLat }, | |
| 'Processing ride request job', | |
| ); | |
| // Publish a ride-matching job to initiate driver matching (fully implemented in M4) | |
| await rideMatchingQueue.add( | |
| 'match-driver', | |
| { tripId, riderId, pickupLng, pickupLat }, | |
| const { tripId, riderId, pickupLng, pickupLat, dropoffLng, dropoffLat } = job.data; | |
| logger.info( | |
| { tripId, riderId, pickupLng, pickupLat, dropoffLng, dropoffLat }, | |
| 'Processing ride request job', | |
| ); | |
| // Publish a ride-matching job to initiate driver matching (fully implemented in M4) | |
| await rideMatchingQueue.add( | |
| 'match-driver', | |
| { tripId, riderId, pickupLng, pickupLat, dropoffLng, dropoffLat }, |
| const rideRequestsQueue = new Queue(QUEUE_NAMES.RIDE_REQUESTS, { | ||
| connection: bullmqConnection, | ||
| }); | ||
|
|
There was a problem hiding this comment.
Resource leak: the rideRequestsQueue Queue instance is created at module level (lines 12-14) but is never closed during application shutdown. This can cause connection leaks when the api-gateway process terminates. Consider implementing a graceful shutdown handler that closes the queue, or use a lazy initialization pattern that allows proper cleanup.
| let rideRequestsQueueShutdownHookRegistered = false; | |
| const registerRideRequestsQueueShutdown = (queue: Queue): void => { | |
| if (rideRequestsQueueShutdownHookRegistered) { | |
| return; | |
| } | |
| rideRequestsQueueShutdownHookRegistered = true; | |
| const shutdown = async (): Promise<void> => { | |
| try { | |
| await queue.close(); | |
| } catch (error) { | |
| logger.error(error, 'Error closing rideRequestsQueue during shutdown'); | |
| } | |
| }; | |
| process.once('beforeExit', () => { | |
| void shutdown(); | |
| }); | |
| const signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM', 'SIGQUIT']; | |
| signals.forEach((signal) => { | |
| process.once(signal, () => { | |
| void shutdown(); | |
| }); | |
| }); | |
| }; | |
| registerRideRequestsQueueShutdown(rideRequestsQueue); |
| const bullmqConnection = { | ||
| host: process.env.REDIS_HOST || 'localhost', | ||
| port: Number(process.env.REDIS_PORT) || 6379, | ||
| } as const; |
There was a problem hiding this comment.
The bullmqConnection object is duplicated in this file (lines 6-9) when it should use the REDIS_CONFIG constant exported from config.ts. This violates the DRY principle and creates maintenance burden. Replace the local bullmqConnection definition with import { REDIS_CONFIG } from '../config.js' and use connection: REDIS_CONFIG instead. This same issue exists in ride-matching.worker.ts and ride-lifecycle.worker.ts.
| const bullmqConnection = { | ||
| host: process.env.REDIS_HOST || 'localhost', | ||
| port: Number(process.env.REDIS_PORT) || 6379, | ||
| } as const; |
There was a problem hiding this comment.
The bullmqConnection object is duplicated here (lines 5-8) when it should use the REDIS_CONFIG constant from config.ts. Replace this with import { REDIS_CONFIG } from '../config.js' and use connection: REDIS_CONFIG at line 33.
.github/copilot-instructions.md
Outdated
| dcl # tail logs | ||
|
|
||
| # Build & typecheck | ||
| pnpm build # turbo build (common first, then api-gateway) |
There was a problem hiding this comment.
Documentation is outdated: Line 62 states "turbo build (common first, then api-gateway)" but after this PR the build order should include ride-worker as well. Update to reflect the complete build order: "turbo build (common first, then api-gateway and ride-worker)".
| pnpm build # turbo build (common first, then api-gateway) | |
| pnpm build # turbo build (common first, then api-gateway and ride-worker) |
| const createRedisClient = (): Redis => { | ||
| const client = new Redis({ | ||
| host: process.env.REDIS_HOST || 'localhost', | ||
| port: Number(process.env.REDIS_PORT) || 6379, | ||
| maxRetriesPerRequest: null, // required by BullMQ | ||
| retryStrategy(times: number) { | ||
| const delay = Math.min(times * 100, 3000); | ||
| logger.warn({ attempt: times, delayMs: delay }, 'Redis reconnecting'); | ||
| return delay; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Inconsistent Redis configuration: The ride-worker Redis client does not use lazyConnect: true (unlike the api-gateway Redis client which does). While this won't cause functional issues, it means the Redis client connects immediately on module load rather than on first use. For consistency with api-gateway patterns and to defer connection establishment until actually needed, consider adding lazyConnect: true to the configuration.
.github/copilot-instructions.md
Outdated
| ```bash | ||
| # Setup & run (Docker-based, includes PostGIS + Redis) | ||
| source activate.sh # loads aliases: dcu, dcd, dcr, dcl, dc | ||
| dcu # docker-compose up -d (postgres + redis + api-gateway) |
There was a problem hiding this comment.
Documentation is outdated: Line 58 mentions "docker-compose up -d (postgres + redis + api-gateway)" but after this PR the docker-compose also starts the ride-worker service. Update this to include ride-worker in the list of services.
| dcu # docker-compose up -d (postgres + redis + api-gateway) | |
| dcu # docker-compose up -d (postgres + redis + api-gateway + ride-worker) |
apps/ride-worker/src/index.ts
Outdated
| await Promise.all(workers.map((w) => w.close())); | ||
| await redis.quit(); | ||
|
|
||
| logger.info('ride-worker shutdown complete'); | ||
| process.exit(0); |
There was a problem hiding this comment.
Missing error handling in shutdown: If worker.close() or redis.quit() fails (lines 19-20), the error will be uncaught and the process will exit with code 0 (success) anyway. Wrap the shutdown logic in a try-catch block and call process.exit(1) on failure to properly signal shutdown errors.
| await Promise.all(workers.map((w) => w.close())); | |
| await redis.quit(); | |
| logger.info('ride-worker shutdown complete'); | |
| process.exit(0); | |
| try { | |
| await Promise.all(workers.map((w) => w.close())); | |
| await redis.quit(); | |
| logger.info('ride-worker shutdown complete'); | |
| process.exit(0); | |
| } catch (err) { | |
| logger.error({ err, signal }, 'Error during ride-worker shutdown'); | |
| process.exit(1); | |
| } |
| "scripts": { | ||
| "build": "tsc -p tsconfig.json", | ||
| "dev": "tsx watch src/index.ts", | ||
| "start": "node ./dist/index.js" |
There was a problem hiding this comment.
Missing test coverage: The ride-worker app has no test files or test scripts in package.json. The api-gateway follows a pattern of comprehensive test coverage for all services (auth.service.test.ts, driver.service.test.ts, ride.service.test.ts). The ride-worker should follow the same pattern with tests for worker processors. At minimum, add tests for: 1) successful job processing, 2) job failure scenarios, 3) queue connection errors, and 4) graceful shutdown behavior. While the workers are skeleton implementations for M4, the test infrastructure should be established now.
- Merge redundant bullmq imports in ride-request.worker.ts - Remove duplicated bullmqConnection in all 3 workers; use REDIS_CONFIG from config.ts (DRY) - Pass dropoffLng/dropoffLat to ride-matching queue (data integrity fix for M4) - Add dropoffLng/dropoffLat fields to RideMatchingJobData interface - Return rideMatchingQueue from createRideRequestWorker for proper cleanup - Wrap shutdown logic in try-catch with process.exit(1) on failure (index.ts) - Add graceful SIGINT/SIGTERM shutdown hook for rideRequestsQueue in ride.service.ts - Update copilot-instructions.md: 3 workspaces, dcu services, build order All 34 tests pass.
Overview
Implements [M2] — BullMQ Ride Queue & Worker App (closes #7).
Creates the
apps/ride-workerworkspace as a separate BullMQ worker process and refactors ride creation to publish jobs to a queue instead of direct processing.Changes
New:
apps/ride-workernodenextmodule resolution (same conventions as api-gateway)src/index.ts— starts all three BullMQ worker processors, graceful SIGTERM/SIGINT shutdownsrc/logger.ts— pino logger with pino-pretty in devsrc/config.ts— Redis config + queue name re-exportssrc/utils/redis.ts— ioredis singleton withmaxRetriesPerRequest: null(required by BullMQ)src/utils/db.ts— Prisma client singletonWorkers
ride-request.worker.tsride-matchingqueueride-matching.worker.tsride-lifecycle.worker.tspackages/commonpackages/common/queues/index.tswithQUEUE_NAMESconstants (RIDE_REQUESTS,RIDE_MATCHING,RIDE_LIFECYCLE)packages/common/index.tsapps/api-gatewaybullmqdependencyride.service.ts— after DB insert, publishes job toride-requestsqueue with{ tripId, riderId, pickupLng, pickupLat, dropoffLng, dropoffLat }andjobId: tripId(deduplication){ host, port }) — avoids ioredis type mismatch with BullMQConnectionOptionsDocker / Infra
compose.yml— addedride-workerservicecompose.override.yml— addedride-workerdev override with bind mount +pnpm devtsconfig.json(root) — addedride-workerproject referenceTests
bullmqQueueconstructor inride.service.test.tsqueue.add()is called with correct job payloadTest Output