From 67de63063a957b98efeae821cb2f74c1a211cf22 Mon Sep 17 00:00:00 2001 From: Risshi25 Date: Sat, 7 Mar 2026 13:53:39 +0530 Subject: [PATCH 1/4] feat(m5): Live location tracking & ride completion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Throttled driver location broadcast (SETNX 3s interval) - Always updates Redis GEO position - Broadcasts to ride:{tripId} room with remaining distance + ETA - PostGIS ST_DistanceSphere for remaining distance calculation - ETA based on 30 km/h city average speed - COMPLETE lifecycle action (ride-lifecycle worker) - PostGIS ST_DistanceSphere for trip distance (meters → km) - Fare formula: max(50, 50 + distanceKm × 12) - Updates Trip: COMPLETED, fare, distance, completedAt - Removes driver from drivers:busy set - Cleans up Redis (lock, OTP keys) - Notifies ride:{tripId} + rider personal room via ride:completed - Fare constants (packages/common/constants/fare.ts) - BASE_FARE: 50, PER_KM_RATE: 12, MIN_FARE: 50 - CITY_AVG_SPEED_KMH: 30, LOCATION_THROTTLE_SECONDS: 3 - History schemas (packages/common/schemas/history.schema.ts) - rideHistoryQuerySchema (page/limit with coerce) - tripIdParamSchema (UUID validation) - New REST endpoints: POST /rides/:tripId/complete (driver) GET /rides/history (paginated, role-aware) GET /rides/:tripId (owner-only detail) GET /user/driver/stats (aggregate: rides, earnings, distance) GET /user/driver/current-ride (ACCEPTED/STARTED trip + rider info) GET /user/rider/current-ride (active trip + driver info + Redis GEO location) Tests: 76 passed (63 api-gateway + 13 ride-worker) TypeScript: api-gateway OK | ride-worker OK | common OK Closes #10 --- apps/api-gateway/src/routes/ride.routes.ts | 109 ++++--- apps/api-gateway/src/routes/user.routes.ts | 49 +++ .../services/__tests__/ride.service.test.ts | 73 ++++- apps/api-gateway/src/services/ride.service.ts | 280 ++++++++++++++++++ .../sockets/__tests__/driver-handler.test.ts | 11 + .../src/sockets/handlers/driver.ts | 58 +++- .../src/workers/ride-lifecycle.worker.ts | 89 +++++- learn/m5-tracking-completion.md | 123 ++++++++ packages/common/constants/fare.ts | 19 ++ packages/common/index.ts | 1 + packages/common/schemas/history.schema.ts | 20 ++ packages/common/schemas/index.ts | 1 + 12 files changed, 770 insertions(+), 63 deletions(-) create mode 100644 learn/m5-tracking-completion.md create mode 100644 packages/common/constants/fare.ts create mode 100644 packages/common/schemas/history.schema.ts diff --git a/apps/api-gateway/src/routes/ride.routes.ts b/apps/api-gateway/src/routes/ride.routes.ts index acb297e..1031d8f 100644 --- a/apps/api-gateway/src/routes/ride.routes.ts +++ b/apps/api-gateway/src/routes/ride.routes.ts @@ -1,6 +1,6 @@ import { Router } from 'express'; import { validate } from '../middleware/validate.js'; -import { rideSchema, otpVerifySchema, rideCancelSchema } from 'common'; +import { rideSchema, otpVerifySchema, rideCancelSchema, rideHistoryQuerySchema, tripIdParamSchema } from 'common'; import { authenticate, authorize } from '../middleware/auth.js'; import { createRide, @@ -9,6 +9,9 @@ import { rejectRide, verifyOtp, driverCancelRide, + completeRide, + getRideHistory, + getRideDetail, } from '../services/ride.service.js'; import { getNearbyAvailableRides } from '../services/driver.service.js'; @@ -18,34 +21,47 @@ const rideRouter: Router = Router(); rideRouter.post('/create', authenticate, validate(rideSchema), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const ride = await createRide(req.body, req.user.userId); if (!ride.success) return res.status(400).json(ride); return res.status(201).json(ride); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); rideRouter.patch('/cancel', authenticate, validate(rideCancelSchema), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const { tripId } = req.body; const ride = await cancelRide(req.user.userId, tripId); if (!ride.success) return res.status(400).json(ride); return res.status(200).json(ride); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); + } +}); + +// ─── Ride history & detail ─────────────────────────────────────────────── + +rideRouter.get('/history', authenticate, validate(rideHistoryQuerySchema), async (req, res) => { + try { + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); + const { page, limit } = req.query as unknown as { page: number; limit: number }; + const result = await getRideHistory(req.user.userId, req.user.role, page || 1, limit || 20); + return res.status(result.success ? 200 : 400).json(result); + } catch (error) { + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); + } +}); + +rideRouter.get('/:tripId', authenticate, validate(tripIdParamSchema), async (req, res) => { + try { + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); + const result = await getRideDetail(req.user.userId, req.params.tripId); + return res.status(result.success ? 200 : (result.message.includes('access') ? 403 : 400)).json(result); + } catch (error) { + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); @@ -53,84 +69,65 @@ rideRouter.patch('/cancel', authenticate, validate(rideCancelSchema), async (req rideRouter.get('/available', authenticate, authorize('driver'), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const result = await getNearbyAvailableRides(req.user.userId); return res.status(result.success ? 200 : 400).json(result); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); rideRouter.post('/:tripId/accept', authenticate, authorize('driver'), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const { offerId } = req.body; - if (!offerId) { - return res.status(400).json({ success: false, message: 'offerId is required' }); - } + if (!offerId) return res.status(400).json({ success: false, message: 'offerId is required' }); const result = await acceptRide(req.user.userId, req.params.tripId, offerId); return res.status(result.success ? 200 : 400).json(result); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); rideRouter.post('/:tripId/reject', authenticate, authorize('driver'), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const { offerId } = req.body; - if (!offerId) { - return res.status(400).json({ success: false, message: 'offerId is required' }); - } + if (!offerId) return res.status(400).json({ success: false, message: 'offerId is required' }); const result = await rejectRide(req.user.userId, req.params.tripId, offerId); return res.status(result.success ? 200 : 400).json(result); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); rideRouter.post('/:tripId/verify-otp', authenticate, authorize('driver'), validate(otpVerifySchema), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const result = await verifyOtp(req.user.userId, req.params.tripId, req.body.otp); return res.status(result.success ? 200 : 400).json(result); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); + } +}); + +rideRouter.post('/:tripId/complete', authenticate, authorize('driver'), async (req, res) => { + try { + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); + const result = await completeRide(req.user.userId, req.params.tripId); + return res.status(result.success ? 200 : 400).json(result); + } catch (error) { + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); rideRouter.post('/:tripId/driver-cancel', authenticate, authorize('driver'), async (req, res) => { try { - if (!req.user) { - return res.status(401).json({ success: false, message: 'Unauthorized' }); - } + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const result = await driverCancelRide(req.user.userId, req.params.tripId); return res.status(result.success ? 200 : 400).json(result); } catch (error) { - return res.status(500).json({ - success: false, - message: error instanceof Error ? error.message : 'Internal server error', - }); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); } }); diff --git a/apps/api-gateway/src/routes/user.routes.ts b/apps/api-gateway/src/routes/user.routes.ts index 36c4167..3fc6bc9 100644 --- a/apps/api-gateway/src/routes/user.routes.ts +++ b/apps/api-gateway/src/routes/user.routes.ts @@ -3,6 +3,7 @@ import { authenticate, authorize } from '../middleware/auth.js'; import { validate } from '../middleware/validate.js'; import { getUserProfile } from '../services/auth.service.js'; import { setDriverOnline, setDriverOffline, updateDriverLocation } from '../services/driver.service.js'; +import { getDriverStats, getDriverCurrentRide, getRiderCurrentRide } from '../services/ride.service.js'; import { driverStatusSchema, driverLocationSchema } from 'common'; import logger from '../logger.js'; @@ -132,4 +133,52 @@ userRouter.post( } ); +/** + * @route GET /user/driver/stats + * @desc Driver statistics (total rides, earnings, distance) + * @access Private (driver only) + */ +userRouter.get('/driver/stats', authenticate, authorize('driver'), async (req: Request, res: Response) => { + try { + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); + const result = await getDriverStats(req.user.userId); + return res.status(result.success ? 200 : 400).json(result); + } catch (error) { + logger.error(error, 'Error fetching driver stats'); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); + } +}); + +/** + * @route GET /user/driver/current-ride + * @desc Driver's current active ride (ACCEPTED or STARTED) + * @access Private (driver only) + */ +userRouter.get('/driver/current-ride', authenticate, authorize('driver'), async (req: Request, res: Response) => { + try { + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); + const result = await getDriverCurrentRide(req.user.userId); + return res.status(result.success ? 200 : 400).json(result); + } catch (error) { + logger.error(error, 'Error fetching driver current ride'); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); + } +}); + +/** + * @route GET /user/rider/current-ride + * @desc Rider's current active ride (REQUESTED, ACCEPTED, or STARTED) + * @access Private + */ +userRouter.get('/rider/current-ride', authenticate, async (req: Request, res: Response) => { + try { + if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); + const result = await getRiderCurrentRide(req.user.userId); + return res.status(result.success ? 200 : 400).json(result); + } catch (error) { + logger.error(error, 'Error fetching rider current ride'); + return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' }); + } +}); + export default userRouter; diff --git a/apps/api-gateway/src/services/__tests__/ride.service.test.ts b/apps/api-gateway/src/services/__tests__/ride.service.test.ts index 3c5c53f..77de6a7 100644 --- a/apps/api-gateway/src/services/__tests__/ride.service.test.ts +++ b/apps/api-gateway/src/services/__tests__/ride.service.test.ts @@ -4,18 +4,20 @@ const prismaMock = vi.hoisted(() => ({ user: { findUnique: vi.fn(), create: vi.fn(), update: vi.fn() }, driver: { findUnique: vi.fn(), create: vi.fn(), update: vi.fn() }, rider: { findUnique: vi.fn(), create: vi.fn() }, - trip: { findFirst: vi.fn(), findUnique: vi.fn(), update: vi.fn() }, + trip: { findFirst: vi.fn(), findUnique: vi.fn(), update: vi.fn(), aggregate: vi.fn() }, rideOffer: { findUnique: vi.fn(), update: vi.fn(), updateMany: vi.fn() }, $transaction: vi.fn(), $queryRaw: vi.fn(), + $queryRawUnsafe: vi.fn(), })); const queueAddMock = vi.hoisted(() => vi.fn().mockResolvedValue({ id: 'job-1' })); const redisSetMock = vi.hoisted(() => vi.fn()); +const redisGeoposMock = vi.hoisted(() => vi.fn()); vi.mock('../../utils/db.js', () => ({ default: prismaMock })); vi.mock('../../utils/redis.js', () => ({ - default: { set: redisSetMock, get: vi.fn(), del: vi.fn() }, + default: { set: redisSetMock, get: vi.fn(), del: vi.fn(), geopos: redisGeoposMock }, })); vi.mock('../../logger.js', () => ({ default: { info: vi.fn(), error: vi.fn(), warn: vi.fn() } })); vi.mock('bullmq', () => { @@ -26,7 +28,7 @@ vi.mock('bullmq', () => { return { Queue: QueueMock }; }); -import { createRide, cancelRide, acceptRide, rejectRide, verifyOtp } from '../ride.service.js'; +import { createRide, cancelRide, acceptRide, rejectRide, verifyOtp, completeRide, getDriverStats } from '../ride.service.js'; const mockRider = { id: 'rider-1', userId: 'user-1' }; const mockDriver = { id: 'driver-1', userId: 'user-driver-1' }; @@ -232,4 +234,69 @@ describe('ride.service', () => { ); }); }); + + // ── completeRide ────────────────────────────────────────────── + + describe('completeRide', () => { + it('returns error if ride is not in STARTED state', async () => { + prismaMock.driver.findUnique.mockResolvedValue(mockDriver); + prismaMock.trip.findUnique.mockResolvedValue({ id: 'trip-1', driverId: 'driver-1', status: 'ACCEPTED' }); + const result = await completeRide('user-driver-1', 'trip-1'); + expect(result.success).toBe(false); + expect(result.message).toMatch(/not in STARTED/i); + }); + + it('publishes COMPLETE job when ride is STARTED', async () => { + prismaMock.driver.findUnique.mockResolvedValue(mockDriver); + prismaMock.trip.findUnique.mockResolvedValue({ id: 'trip-1', driverId: 'driver-1', status: 'STARTED' }); + + const result = await completeRide('user-driver-1', 'trip-1'); + + expect(result.success).toBe(true); + expect(result.message).toMatch(/completion submitted/i); + expect(queueAddMock).toHaveBeenCalledWith( + 'lifecycle-complete', + expect.objectContaining({ + action: 'COMPLETE', + tripId: 'trip-1', + driverId: 'driver-1', + }), + expect.objectContaining({ jobId: 'complete:trip-1' }) + ); + }); + + it('returns error if driver is not assigned to trip', async () => { + prismaMock.driver.findUnique.mockResolvedValue({ id: 'driver-2', userId: 'user-driver-2' }); + prismaMock.trip.findUnique.mockResolvedValue({ id: 'trip-1', driverId: 'driver-1', status: 'STARTED' }); + const result = await completeRide('user-driver-2', 'trip-1'); + expect(result.success).toBe(false); + expect(result.message).toMatch(/not assigned/i); + }); + }); + + // ── getDriverStats ──────────────────────────────────────────── + + describe('getDriverStats', () => { + it('returns error if driver not found', async () => { + prismaMock.driver.findUnique.mockResolvedValue(null); + const result = await getDriverStats('user-unknown'); + expect(result.success).toBe(false); + expect(result.message).toMatch(/driver not found/i); + }); + + it('returns aggregate stats for completed rides', async () => { + prismaMock.driver.findUnique.mockResolvedValue(mockDriver); + prismaMock.trip.aggregate.mockResolvedValue({ + _count: { id: 5 }, + _sum: { fare: 650.50, distance: 42.3 }, + }); + + const result = await getDriverStats('user-driver-1'); + + expect(result.success).toBe(true); + expect(result.data?.totalRides).toBe(5); + expect(result.data?.totalEarnings).toBe(650.50); + expect(result.data?.totalDistance).toBe(42.3); + }); + }); }); diff --git a/apps/api-gateway/src/services/ride.service.ts b/apps/api-gateway/src/services/ride.service.ts index 60590c7..1e977f0 100644 --- a/apps/api-gateway/src/services/ride.service.ts +++ b/apps/api-gateway/src/services/ride.service.ts @@ -345,3 +345,283 @@ export const driverCancelRide = async (userId: string, tripId: string): Promise< throw new Error('Could not cancel ride. Please try again.'); } }; + +// ─── COMPLETE RIDE ──────────────────────────────────────────────────────── + +export const completeRide = async (userId: string, tripId: string): Promise => { + try { + const driver = await prisma.driver.findUnique({ where: { userId } }); + if (!driver) return { success: false, message: 'Driver not found' }; + + const trip = await prisma.trip.findUnique({ where: { id: tripId } }); + if (!trip) return { success: false, message: 'Trip not found' }; + if (trip.driverId !== driver.id) return { success: false, message: 'You are not assigned to this ride' }; + if (trip.status !== 'STARTED') return { success: false, message: 'Ride is not in STARTED state' }; + + await rideLifecycleQueue.add( + 'lifecycle-complete', + { + action: 'COMPLETE' as const, + tripId, + driverId: driver.id, + }, + { jobId: `complete:${tripId}` }, + ); + + return { success: true, message: 'Ride completion submitted' }; + } catch (error) { + logger.error(error, 'Error completing ride'); + throw new Error('Could not complete ride. Please try again.'); + } +}; + +// ─── RIDE HISTORY ───────────────────────────────────────────────────────── + +export const getRideHistory = async ( + userId: string, + role: string, + page: number, + limit: number, +): Promise => { + try { + let whereClause = ''; + let countClause = ''; + + if (role === 'rider') { + const rider = await prisma.rider.findUnique({ where: { userId }, select: { id: true } }); + if (!rider) return { success: false, message: 'Rider not found' }; + whereClause = `WHERE t."riderId" = '${rider.id}'`; + countClause = whereClause; + } else { + const driver = await prisma.driver.findUnique({ where: { userId }, select: { id: true } }); + if (!driver) return { success: false, message: 'Driver not found' }; + whereClause = `WHERE t."driverId" = '${driver.id}'`; + countClause = whereClause; + } + + const offset = (page - 1) * limit; + + const rides = await prisma.$queryRawUnsafe[]>(` + SELECT + t.id, + t.status, + ST_AsText(t."pickupLocation") as "pickupLocation", + ST_AsText(t."dropoffLocation") as "dropoffLocation", + t.fare, + t.distance, + t."createdAt", + t."completedAt" + FROM "Trip" t + ${whereClause} + ORDER BY t."createdAt" DESC + LIMIT ${limit} OFFSET ${offset} + `); + + const countResult = await prisma.$queryRawUnsafe<{ count: bigint }[]>(` + SELECT COUNT(*) as count FROM "Trip" t ${countClause} + `); + const total = Number(countResult[0]?.count ?? 0); + + return { + success: true, + message: 'Ride history retrieved', + data: { + rides, + pagination: { page, limit, total, totalPages: Math.ceil(total / limit) }, + }, + }; + } catch (error) { + logger.error(error, 'Error fetching ride history'); + throw new Error('Could not fetch ride history.'); + } +}; + +// ─── RIDE DETAIL ────────────────────────────────────────────────────────── + +export const getRideDetail = async (userId: string, tripId: string): Promise => { + try { + // Check access: must be the rider or assigned driver + const rider = await prisma.rider.findUnique({ where: { userId }, select: { id: true } }); + const driver = await prisma.driver.findUnique({ where: { userId }, select: { id: true } }); + + const trip = await prisma.$queryRaw[]>` + SELECT + t.id, + t."riderId", + t."driverId", + t.status, + t.otp, + t.fare, + t.distance, + ST_AsText(t."pickupLocation") as "pickupLocation", + ST_AsText(t."dropoffLocation") as "dropoffLocation", + t."createdAt", + t."completedAt" + FROM "Trip" t WHERE t.id = ${tripId} + `; + + if (!trip.length) return { success: false, message: 'Trip not found' }; + + const tripData = trip[0]; + const isRider = rider && tripData.riderId === rider.id; + const isDriver = driver && tripData.driverId === driver.id; + + if (!isRider && !isDriver) { + return { success: false, message: 'You do not have access to this ride' }; + } + + return { + success: true, + message: 'Ride detail retrieved', + data: tripData, + }; + } catch (error) { + logger.error(error, 'Error fetching ride detail'); + throw new Error('Could not fetch ride detail.'); + } +}; + +// ─── DRIVER STATS ───────────────────────────────────────────────────────── + +export const getDriverStats = async (userId: string): Promise => { + try { + const driver = await prisma.driver.findUnique({ where: { userId }, select: { id: true } }); + if (!driver) return { success: false, message: 'Driver not found' }; + + const stats = await prisma.trip.aggregate({ + where: { driverId: driver.id, status: 'COMPLETED' }, + _count: { id: true }, + _sum: { fare: true, distance: true }, + }); + + return { + success: true, + message: 'Driver stats retrieved', + data: { + totalRides: stats._count.id, + totalEarnings: stats._sum.fare ?? 0, + totalDistance: stats._sum.distance ?? 0, + }, + }; + } catch (error) { + logger.error(error, 'Error fetching driver stats'); + throw new Error('Could not fetch driver stats.'); + } +}; + +// ─── DRIVER CURRENT RIDE ───────────────────────────────────────────────── + +export const getDriverCurrentRide = async (userId: string): Promise => { + try { + const driver = await prisma.driver.findUnique({ where: { userId }, select: { id: true } }); + if (!driver) return { success: false, message: 'Driver not found' }; + + const trip = await prisma.$queryRaw[]>` + SELECT + t.id, + t."riderId", + t.status, + t.otp, + ST_AsText(t."pickupLocation") as "pickupLocation", + ST_AsText(t."dropoffLocation") as "dropoffLocation", + t."createdAt", + r."userId" as "riderUserId" + FROM "Trip" t + JOIN "Rider" r ON r.id = t."riderId" + WHERE t."driverId" = ${driver.id} + AND t.status IN ('ACCEPTED', 'STARTED') + ORDER BY t."createdAt" DESC + LIMIT 1 + `; + + if (!trip.length) { + return { success: true, message: 'No active ride', data: { ride: null } }; + } + + // Get rider basic info + const riderUser = await prisma.user.findUnique({ + where: { id: trip[0].riderUserId as string }, + select: { name: true, number: true }, + }); + + return { + success: true, + message: 'Current ride retrieved', + data: { ride: { ...trip[0], riderName: riderUser?.name, riderPhone: riderUser?.number } }, + }; + } catch (error) { + logger.error(error, 'Error fetching driver current ride'); + throw new Error('Could not fetch current ride.'); + } +}; + +// ─── RIDER CURRENT RIDE ────────────────────────────────────────────────── + +export const getRiderCurrentRide = async (userId: string): Promise => { + try { + const rider = await prisma.rider.findUnique({ where: { userId }, select: { id: true } }); + if (!rider) return { success: false, message: 'Rider not found' }; + + const trip = await prisma.$queryRaw[]>` + SELECT + t.id, + t."driverId", + t.status, + t.otp, + ST_AsText(t."pickupLocation") as "pickupLocation", + ST_AsText(t."dropoffLocation") as "dropoffLocation", + t."createdAt" + FROM "Trip" t + WHERE t."riderId" = ${rider.id} + AND t.status IN ('REQUESTED', 'ACCEPTED', 'STARTED') + ORDER BY t."createdAt" DESC + LIMIT 1 + `; + + if (!trip.length) { + return { success: true, message: 'No active ride', data: { ride: null } }; + } + + const tripData = trip[0] as Record; + + // If driver is assigned, get their info + location + if (tripData.driverId) { + const driverRecord = await prisma.driver.findUnique({ + where: { id: tripData.driverId as string }, + select: { userId: true }, + }); + if (driverRecord) { + const driverUser = await prisma.user.findUnique({ + where: { id: driverRecord.userId }, + select: { name: true, number: true }, + }); + tripData.driverName = driverUser?.name; + tripData.driverPhone = driverUser?.number; + + // If STARTED, include driver's current location from Redis GEO + if (tripData.status === 'STARTED') { + try { + const pos = await redis.geopos('drivers:active', driverRecord.userId); + if (pos && pos[0]) { + tripData.driverLocation = { + lng: parseFloat(pos[0][0] as string), + lat: parseFloat(pos[0][1] as string), + }; + } + } catch { + // Redis GEO might not have position — that's ok + } + } + } + } + + return { + success: true, + message: 'Current ride retrieved', + data: { ride: tripData }, + }; + } catch (error) { + logger.error(error, 'Error fetching rider current ride'); + throw new Error('Could not fetch current ride.'); + } +}; diff --git a/apps/api-gateway/src/sockets/__tests__/driver-handler.test.ts b/apps/api-gateway/src/sockets/__tests__/driver-handler.test.ts index 2f7f82e..c411263 100644 --- a/apps/api-gateway/src/sockets/__tests__/driver-handler.test.ts +++ b/apps/api-gateway/src/sockets/__tests__/driver-handler.test.ts @@ -17,6 +17,16 @@ vi.mock('../../services/ride.service.js', () => ({ acceptRide: acceptRideMock, rejectRide: rejectRideMock, })); +vi.mock('../../utils/redis.js', () => ({ + default: { set: vi.fn(), geopos: vi.fn() }, +})); +vi.mock('../../utils/db.js', () => ({ + default: { + driver: { findUnique: vi.fn() }, + trip: { findFirst: vi.fn() }, + $queryRaw: vi.fn(), + }, +})); vi.mock('../../logger.js', () => ({ default: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, })); @@ -30,6 +40,7 @@ const makeSocket = (userId: string): Socket & { _trigger: (e: string, ...a: unkn data: { user: { userId, role: 'driver' } }, join: vi.fn().mockResolvedValue(undefined), emit: vi.fn(), + to: vi.fn().mockReturnValue({ emit: vi.fn() }), on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { listeners[event] = handler; }), diff --git a/apps/api-gateway/src/sockets/handlers/driver.ts b/apps/api-gateway/src/sockets/handlers/driver.ts index 46ad4eb..1df855f 100644 --- a/apps/api-gateway/src/sockets/handlers/driver.ts +++ b/apps/api-gateway/src/sockets/handlers/driver.ts @@ -2,7 +2,15 @@ import type { Socket } from 'socket.io'; import { setDriverOnline, setDriverOffline, updateDriverLocation } from '../../services/driver.service.js'; import { acceptRide, rejectRide } from '../../services/ride.service.js'; import type { JwtPayload } from '../../utils/jwt.js'; +import redis from '../../utils/redis.js'; +import prisma from '../../utils/db.js'; import logger from '../../logger.js'; +import { + LOCATION_THROTTLE_SECONDS, + LOCATION_THROTTLE_KEY_PREFIX, + CITY_AVG_SPEED_KMH, + DRIVERS_GEO_KEY, +} from 'common'; /** * Register driver-specific socket event handlers. @@ -11,7 +19,7 @@ import logger from '../../logger.js'; * Events: * driver:go-online { lng, lat } → set online in DB + Redis GEO * driver:go-offline {} → set offline in DB + remove from Redis GEO - * driver:location-update { lng, lat } → update Redis GEO position + * driver:location-update { lng, lat } → throttled update + broadcast to ride room * driver:accept-ride { tripId, offerId } → accept ride offer (SETNX lock) * driver:reject-ride { tripId, offerId } → reject ride offer → cascade */ @@ -42,9 +50,54 @@ export const registerDriverHandlers = (socket: Socket): void => { } }); - socket.on('driver:location-update', async (data: { lng: number; lat: number }) => { + socket.on('driver:location-update', async (data: { lng: number; lat: number; heading?: number; speed?: number }) => { try { + // 1. Always update Redis GEO position await updateDriverLocation(user.userId, [data.lng, data.lat]); + + // 2. Throttle broadcast: SETNX with 3s TTL → drop if key exists + const throttleKey = `${LOCATION_THROTTLE_KEY_PREFIX}${user.userId}`; + const allowed = await redis.set(throttleKey, '1', 'EX', LOCATION_THROTTLE_SECONDS, 'NX'); + if (!allowed) return; // Throttled — skip broadcast + + // 3. Find driver's active STARTED trip + const driver = await prisma.driver.findUnique({ where: { userId: user.userId }, select: { id: true } }); + if (!driver) return; + + const activeTrip = await prisma.trip.findFirst({ + where: { driverId: driver.id, status: 'STARTED' }, + select: { id: true }, + }); + if (!activeTrip) return; // No active ride — no broadcast needed + + // 4. Calculate remaining distance: ST_DistanceSphere(current, dropoff) + let remainingDistanceKm = 0; + let etaMinutes = 0; + try { + const distResult = await prisma.$queryRaw<{ distance_meters: number }[]>` + SELECT ST_DistanceSphere( + ST_SetSRID(ST_MakePoint(${data.lng}, ${data.lat}), 4326)::geometry, + "dropoffLocation"::geometry + ) as distance_meters + FROM "Trip" WHERE id = ${activeTrip.id} + `; + const meters = distResult[0]?.distance_meters ?? 0; + remainingDistanceKm = Math.round((meters / 1000) * 100) / 100; + etaMinutes = Math.round((remainingDistanceKm / CITY_AVG_SPEED_KMH) * 60 * 10) / 10; + } catch { + // PostGIS query might fail — still broadcast location without ETA + } + + // 5. Broadcast to ride:{tripId} room + socket.to(`ride:${activeTrip.id}`).emit('ride:driver-location', { + lng: data.lng, + lat: data.lat, + heading: data.heading, + speed: data.speed, + remainingDistanceKm, + etaMinutes, + timestamp: new Date().toISOString(), + }); } catch (err) { logger.error({ err, userId: user.userId }, 'Error handling driver:location-update'); } @@ -55,7 +108,6 @@ export const registerDriverHandlers = (socket: Socket): void => { const result = await acceptRide(user.userId, data.tripId, data.offerId); socket.emit('driver:accept-ride:ack', result); if (result.success) { - // Auto-join the ride room void socket.join(`ride:${data.tripId}`); } } catch (err) { diff --git a/apps/ride-worker/src/workers/ride-lifecycle.worker.ts b/apps/ride-worker/src/workers/ride-lifecycle.worker.ts index 75c784a..8773ed5 100644 --- a/apps/ride-worker/src/workers/ride-lifecycle.worker.ts +++ b/apps/ride-worker/src/workers/ride-lifecycle.worker.ts @@ -11,6 +11,9 @@ import { OTP_TTL_SECONDS, MAX_OTP_ATTEMPTS, RIDE_LOCK_KEY_PREFIX, + BASE_FARE, + PER_KM_RATE, + MIN_FARE, } from 'common'; const prisma = new PrismaClient(); @@ -262,6 +265,90 @@ const handleCancel = async (data: RideLifecycleJobData): Promise => { logger.info({ tripId, reason }, 'Ride CANCELLED'); }; +// ─── COMPLETE ───────────────────────────────────────────────────────────── + +const handleComplete = async (data: RideLifecycleJobData): Promise => { + const { tripId, driverId } = data; + + const trip = await prisma.trip.findUnique({ where: { id: tripId } }); + if (!trip) throw new Error(`Trip ${tripId} not found`); + validateTransition(trip.status, 'COMPLETED'); + + // 1. Calculate distance using PostGIS ST_DistanceSphere + const distanceResult = await prisma.$queryRaw<{ distance_meters: number }[]>` + SELECT ST_DistanceSphere( + "pickupLocation"::geometry, + "dropoffLocation"::geometry + ) as distance_meters + FROM "Trip" WHERE id = ${tripId} + `; + + const distanceMeters = distanceResult[0]?.distance_meters ?? 0; + const distanceKm = distanceMeters / 1000; + + // 2. Calculate fare: max(MIN_FARE, BASE_FARE + distance_km * PER_KM_RATE) + const calculatedFare = BASE_FARE + distanceKm * PER_KM_RATE; + const fare = Math.round(Math.max(MIN_FARE, calculatedFare) * 100) / 100; + + // 3. Update Trip: COMPLETED, fare, distance, completedAt + await prisma.trip.update({ + where: { id: tripId }, + data: { + status: 'COMPLETED', + fare, + distance: Math.round(distanceKm * 100) / 100, + completedAt: new Date(), + }, + }); + + // 4. Remove driver from busy set + if (trip.driverId) { + const driver = await prisma.driver.findUnique({ + where: { id: trip.driverId }, + select: { userId: true }, + }); + if (driver) { + await redis.srem(DRIVERS_BUSY_KEY, driver.userId); + } + } + + // 5. Clean up Redis keys + await redis.del(`${RIDE_LOCK_KEY_PREFIX}${tripId}`); + await redis.del(`${OTP_KEY_PREFIX}${tripId}`); + await redis.del(`${OTP_ATTEMPTS_KEY_PREFIX}${tripId}`); + + // 6. Get pickup/dropoff as text for notification + const locations = await prisma.$queryRaw<{ pickup: string; dropoff: string }[]>` + SELECT ST_AsText("pickupLocation") as pickup, ST_AsText("dropoffLocation") as dropoff + FROM "Trip" WHERE id = ${tripId} + `; + + // 7. Notify ride room + await emitToRoom(`ride:${tripId}`, 'ride:completed', { + tripId, + fare, + distanceKm: Math.round(distanceKm * 100) / 100, + pickupLocation: locations[0]?.pickup ?? '', + dropoffLocation: locations[0]?.dropoff ?? '', + completedAt: new Date().toISOString(), + }); + + // Also notify rider personal room + const rider = await prisma.rider.findUnique({ + where: { id: trip.riderId }, + select: { userId: true }, + }); + if (rider) { + await emitToRoom(`rider:${rider.userId}`, 'ride:completed', { + tripId, + fare, + distanceKm: Math.round(distanceKm * 100) / 100, + }); + } + + logger.info({ tripId, fare, distanceKm, driverId }, 'Ride COMPLETED'); +}; + // ─── MAIN PROCESSOR ────────────────────────────────────────────────────── const processRideLifecycle = async (job: Job) => { @@ -281,7 +368,7 @@ const processRideLifecycle = async (job: Job) => { logger.info({ tripId }, 'START action — handled via VERIFY_OTP flow'); break; case 'COMPLETE': - logger.info({ tripId }, 'COMPLETE action — will be implemented in M5'); + await handleComplete(job.data); break; case 'CANCEL': await handleCancel(job.data); diff --git a/learn/m5-tracking-completion.md b/learn/m5-tracking-completion.md new file mode 100644 index 0000000..fce7305 --- /dev/null +++ b/learn/m5-tracking-completion.md @@ -0,0 +1,123 @@ +# M5 — Live Location Tracking & Ride Completion + +> **Branch:** `feature/m5-tracking-completion` | **Closes:** Issue #10 + +--- + +## What this milestone does + +M5 completes the end-to-end ride-sharing flow. After a ride is STARTED (M4), the driver streams live location to the rider, and when the ride ends, the system calculates the fare using PostGIS. + +--- + +## Throttled Location Broadcast + +``` +Driver GPS update (every ~1s) + │ + ▼ +driver:location-update { lng, lat, heading?, speed? } + │ + ├── Always: GEOADD Redis GEO (update position) + │ + ├── SETNX driver:location-throttle:{userId} → 3s TTL + │ ├── OK (allowed): proceed to broadcast + │ └── null (throttled): skip broadcast (update still saved) + │ + ├── Look up driver's STARTED trip + │ + ├── PostGIS: ST_DistanceSphere(currentPos, dropoff) → remaining km + │ + ├── ETA: remainingKm / 30 km/h * 60 → minutes + │ + └── socket.to(ride:{tripId}).emit('ride:driver-location', { + lng, lat, heading, speed, + remainingDistanceKm, etaMinutes, + timestamp + }) +``` + +### Why throttle? +Without throttling, a driver sending GPS every second = 60 PostGIS queries/minute + 60 socket broadcasts. With 3s throttle: 20/minute — 3x reduction. + +--- + +## Ride Completion — COMPLETE Action + +``` +POST /rides/:tripId/complete (driver) + │ + ▼ +lifecycle queue → ride-lifecycle worker + │ + ├── 1. Validate STARTED → COMPLETED transition + ├── 2. PostGIS: ST_DistanceSphere(pickup, dropoff) → meters + ├── 3. Fare: max(50, 50 + distanceKm × 12) → round to 2 decimals + ├── 4. Update Trip: status=COMPLETED, fare, distance, completedAt + ├── 5. Remove driver from drivers:busy set + ├── 6. Cleanup Redis: lock, OTP keys + └── 7. Notify ride:{tripId} room + rider room → ride:completed +``` + +| Constant | Value | Purpose | +|----------|-------|---------| +| BASE_FARE | 50 | Minimum base charge | +| PER_KM_RATE | 12 | Rate per kilometer | +| MIN_FARE | 50 | Floor fare | +| CITY_AVG_SPEED_KMH | 30 | ETA estimation speed | + +--- + +## New REST Endpoints + +| Method | Path | Auth | Purpose | +|--------|------|------|---------| +| POST | `/rides/:tripId/complete` | driver | Complete ride → fare calculation | +| GET | `/rides/history` | any | Paginated ride history (page/limit) | +| GET | `/rides/:tripId` | owner | Single ride detail (rider or driver) | +| GET | `/user/driver/stats` | driver | Total rides, earnings, distance | +| GET | `/user/driver/current-ride` | driver | Active ACCEPTED/STARTED ride | +| GET | `/user/rider/current-ride` | rider | Active REQUESTED/ACCEPTED/STARTED ride | + +--- + +## Ride History — PostGIS + Raw SQL + +Standard Prisma `findMany` can't handle geometry fields. Solution: `$queryRaw` with `ST_AsText`: + +```sql +SELECT t.id, t.status, + ST_AsText(t."pickupLocation") as "pickupLocation", + ST_AsText(t."dropoffLocation") as "dropoffLocation", + t.fare, t.distance, t."createdAt", t."completedAt" +FROM "Trip" t WHERE t."riderId" = $1 +ORDER BY t."createdAt" DESC LIMIT 20 OFFSET 0 +``` + +--- + +## Rider Current Ride — Driver Location from Redis + +When a rider checks their active ride during `STARTED` status, the system fetches the driver's live position from Redis GEO: + +```ts +const pos = await redis.geopos('drivers:active', driverUserId); +// → [[lng, lat]] or [[null, null]] +``` + +This avoids DB queries and gives real-time position. + +--- + +## Complete Flow (M1–M5) + +``` +1. Rider: POST /rides/create → REQUESTED +2. Worker: cascade match → nearest driver → ride:offer +3. Driver: driver:accept-ride → SETNX lock → ACCEPTED +4. Worker: OTP generated → ride:otp → rider +5. Driver arrives → rider shares OTP → POST /rides/:tripId/verify-otp → STARTED +6. Driver streams location → throttled broadcast → ride:driver-location +7. Driver: POST /rides/:tripId/complete → COMPLETED (fare + distance) +8. Both: GET /rides/history → see past rides +``` diff --git a/packages/common/constants/fare.ts b/packages/common/constants/fare.ts new file mode 100644 index 0000000..1039155 --- /dev/null +++ b/packages/common/constants/fare.ts @@ -0,0 +1,19 @@ +/** Fare calculation constants. */ + +/** Base fare in currency units. */ +export const BASE_FARE = 50; + +/** Rate per kilometer. */ +export const PER_KM_RATE = 12; + +/** Minimum fare (never below base fare). */ +export const MIN_FARE = 50; + +/** Average city speed in km/h for ETA estimation. */ +export const CITY_AVG_SPEED_KMH = 30; + +/** Throttle interval for driver location updates (seconds). */ +export const LOCATION_THROTTLE_SECONDS = 3; + +/** Redis key prefix for location throttle — full key: `driver:location-throttle:{userId}` */ +export const LOCATION_THROTTLE_KEY_PREFIX = 'driver:location-throttle:'; diff --git a/packages/common/index.ts b/packages/common/index.ts index fbf9a25..914ed28 100644 --- a/packages/common/index.ts +++ b/packages/common/index.ts @@ -2,5 +2,6 @@ export * from './schemas/index.js'; export * from './queues/index.js'; export * from './events/index.js'; export * from './constants/matching.js'; +export * from './constants/fare.js'; export { PrismaClient } from '@prisma/client'; diff --git a/packages/common/schemas/history.schema.ts b/packages/common/schemas/history.schema.ts new file mode 100644 index 0000000..9732eae --- /dev/null +++ b/packages/common/schemas/history.schema.ts @@ -0,0 +1,20 @@ +import { z } from 'zod'; + +/** Schema for ride history query params (pagination). */ +export const rideHistoryQuerySchema = z.object({ + query: z.object({ + page: z.coerce.number().int().min(1).default(1), + limit: z.coerce.number().int().min(1).max(50).default(20), + }), +}); + +export type RideHistoryQuery = z.infer['query']; + +/** Schema for :tripId route param validation. */ +export const tripIdParamSchema = z.object({ + params: z.object({ + tripId: z.string().uuid('tripId must be a valid UUID'), + }), +}); + +export type TripIdParam = z.infer['params']; diff --git a/packages/common/schemas/index.ts b/packages/common/schemas/index.ts index 76fdf25..920254b 100644 --- a/packages/common/schemas/index.ts +++ b/packages/common/schemas/index.ts @@ -2,3 +2,4 @@ export * from './user.schema.js'; export * from './ride.schema.js'; export * from './driver.schema.js'; export * from './otp.schema.js'; +export * from './history.schema.js'; From 36ac8443186f13dff5af7d01ee4e2829f39f16d1 Mon Sep 17 00:00:00 2001 From: Risshi25 Date: Sat, 7 Mar 2026 14:02:19 +0530 Subject: [PATCH 2/4] =?UTF-8?q?docs:=20Rewrite=20README=20=E2=80=94=20recr?= =?UTF-8?q?uiter-friendly=20project=20overview?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 198 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 190 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 849656a..abc5873 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,194 @@ +
+ +# 🚀 UrbanPulse + +**Production-grade ride-sharing backend** built with a distributed microservices architecture. + +*Real-time driver matching • Live GPS tracking • OTP-verified pickups • PostGIS fare calculation* + +[![TypeScript](https://img.shields.io/badge/TypeScript-5.x-3178C6?logo=typescript&logoColor=white)](https://www.typescriptlang.org/) +[![Node.js](https://img.shields.io/badge/Node.js-20+-339933?logo=node.js&logoColor=white)](https://nodejs.org/) +[![PostgreSQL](https://img.shields.io/badge/PostgreSQL-PostGIS-4169E1?logo=postgresql&logoColor=white)](https://www.postgresql.org/) +[![Redis](https://img.shields.io/badge/Redis-GEO%20%2B%20Pub%2FSub-DC382D?logo=redis&logoColor=white)](https://redis.io/) +[![Socket.io](https://img.shields.io/badge/Socket.io-Realtime-010101?logo=socket.io&logoColor=white)](https://socket.io/) + +
+ +--- + +## What is this? + +UrbanPulse is a **full ride-sharing backend** — think of it as the engine behind apps like Uber/Ola. A rider requests a ride, the system finds the nearest available driver, handles the entire lifecycle through completion, and calculates the fare — all in real-time. + +This isn't a tutorial project. It's a distributed system with **race condition protection**, **cascading driver matching**, **cross-process event emission**, and **geospatial queries** — the same patterns used in production ride-sharing platforms. + +--- + +## The Ride Flow + +``` +Rider requests ride + ↓ + Nearest driver found (Redis GEOSEARCH, 5km radius) + ↓ + Offer sent via WebSocket → 30s timeout → cascade to next driver + ↓ + Driver accepts (distributed SETNX lock prevents double-accept) + ↓ + 4-digit OTP generated → sent to rider + ↓ + Driver verifies OTP at pickup → ride starts + ↓ + Live GPS streaming (throttled, with ETA) → rider sees driver moving + ↓ + Driver completes ride → PostGIS distance → fare calculated + ↓ + Both see ride in history with full details +``` + +--- + +## Architecture + +``` +┌─────────────────────────────────────────────────────┐ +│ API Gateway │ +│ Express REST API + Socket.io WebSocket Server │ +│ Auth (JWT) • Rate Limiting • Zod Validation │ +└──────────────┬──────────────────────┬────────────────┘ + │ BullMQ Jobs │ Redis Pub/Sub + ▼ ▼ +┌──────────────────────┐ ┌────────────────────┐ +│ Ride Worker │ │ Notifications │ +│ Matching • Lifecycle │ │ Socket Adapter │ +│ State Machine • OTP │ │ Cross-process │ +└──────────┬───────────┘ └────────────────────┘ + │ + ┌─────┴─────┐ + ▼ ▼ +┌─────────┐ ┌─────────┐ +│ Postgres│ │ Redis │ +│ PostGIS │ │ GEO+Pub │ +└─────────┘ └─────────┘ +``` + +**Monorepo** (Turborepo + pnpm workspaces) with 4 packages: + +| Package | Role | +|---------|------| +| `api-gateway` | REST API + WebSocket server | +| `ride-worker` | Background job processing (BullMQ) | +| `common` | Shared Prisma schema, Zod schemas, constants | +| `notifications` | Socket.io Redis adapter for cross-process events | + +--- + +## Key Engineering Decisions + +### 🗺️ Why Redis GEO over SQL for driver lookup? +`GEOSEARCH` returns drivers sorted by distance in **O(log N + M)** — orders of magnitude faster than a PostGIS query scanning the drivers table. Drivers update their position every few seconds; Redis handles this write-heavy workload without touching the database. + +### 🔒 Why SETNX for ride acceptance? +Two drivers could accept the same ride simultaneously. `SETNX` (set-if-not-exists) acts as a **distributed lock** — the first driver wins atomically, the second gets a clean rejection. No database race conditions. + +### ⚡ Why BullMQ instead of direct processing? +Ride matching involves multiple network calls (Redis lookup → filter → DB write → WebSocket emit → schedule timeout). If any step fails, BullMQ **retries automatically**. The cascade timeout (30s per driver) uses BullMQ's delayed jobs — no cron needed. + +### 🚦 Why throttle location updates? +A driver's phone sends GPS every ~1 second. Without throttling, that's 60 PostGIS queries + 60 WebSocket broadcasts per minute per active ride. The **SETNX 3-second throttle** cuts this to 20/minute while still updating Redis GEO position on every tick. + +--- + +## Tech Stack + +| Layer | Technology | Why | +|-------|-----------|-----| +| Language | TypeScript | End-to-end type safety across all packages | +| API | Express.js | Lightweight, middleware-driven | +| Realtime | Socket.io + Redis Adapter | Cross-process WebSocket events | +| Database | PostgreSQL + PostGIS | Geospatial queries (distance, points) | +| ORM | Prisma | Type-safe DB access + migrations | +| Cache/Geo | Redis | GEO commands, pub/sub, distributed locks | +| Queue | BullMQ | Reliable job processing with retries | +| Validation | Zod | Runtime schema validation | +| Auth | JWT + bcrypt | Stateless authentication | +| Monorepo | Turborepo + pnpm | Shared packages, parallel builds | +| Testing | Vitest | Fast unit tests (76 passing) | +| Infra | Docker Compose | One-command dev environment | + +--- + +## API Highlights + +
+Authentication + +- `POST /auth/register` — rider or driver registration +- `POST /auth/login` — JWT token issuance +- `GET /user/profile` — authenticated user profile +
+ +
+Ride Lifecycle + +- `POST /rides/create` — request a ride (triggers matching) +- `POST /rides/:tripId/accept` — driver accepts (SETNX lock) +- `POST /rides/:tripId/verify-otp` — OTP verification at pickup +- `POST /rides/:tripId/complete` — complete ride (fare calculation) +- `PATCH /rides/cancel` — cancel ride +
+ +
+Dashboard + +- `GET /rides/history` — paginated ride history +- `GET /rides/:tripId` — ride details +- `GET /user/driver/stats` — total rides, earnings, distance +- `GET /user/driver/current-ride` — active ride + rider info +- `GET /user/rider/current-ride` — active ride + driver location +
+ +
+WebSocket Events + +- `ride:offer` → driver receives ride offer +- `ride:accepted` → rider notified of match +- `ride:otp` → rider receives pickup OTP +- `ride:driver-location` → live tracking with ETA +- `ride:completed` → fare breakdown +
+ +--- + +## Quick Start + +```bash +# Clone and start infrastructure +git clone https://github.com/codeRisshi25/urbanpulse-backend.git +cd urbanpulse-backend +cp .env.example .env + +# Start PostgreSQL + Redis +docker compose up -d + +# Install and run +pnpm install +pnpm run dev +``` + +--- + +## Testing + ```bash - source activate.sh - dcu - # for psql client - dc - psql -h host -U user -d db +pnpm test # 76 tests across all packages +pnpm typecheck # TypeScript validation ``` -# small dev log +A Postman collection (`UrbanPulse_Postman_Collection.json`) is included for manual API testing. + +--- -- postgres migration on check -- turbo repo +
+Built as a systems design exercise in distributed real-time architectures. +
From ebc6e232b6fa7343e3d3e6ae8f0f8853439053e1 Mon Sep 17 00:00:00 2001 From: Risshi25 Date: Sat, 7 Mar 2026 14:05:48 +0530 Subject: [PATCH 3/4] update README.md --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index abc5873..9290f57 100644 --- a/README.md +++ b/README.md @@ -16,11 +16,11 @@ --- -## What is this? +## What is Urban Pulse ? -UrbanPulse is a **full ride-sharing backend** — think of it as the engine behind apps like Uber/Ola. A rider requests a ride, the system finds the nearest available driver, handles the entire lifecycle through completion, and calculates the fare — all in real-time. +UrbanPulse is a **full ride-sharing backend** , think of it as the engine behind apps like Uber/Ola. A rider requests a ride, the system finds the nearest available driver, handles the entire lifecycle through completion, and calculates the fare , all in real-time. -This isn't a tutorial project. It's a distributed system with **race condition protection**, **cascading driver matching**, **cross-process event emission**, and **geospatial queries** — the same patterns used in production ride-sharing platforms. +It's a distributed system with **race condition protection**, **cascading driver matching**, **cross-process event emission**, and **geospatial queries** — the same patterns used in production ride-sharing platforms. --- @@ -51,18 +51,18 @@ Rider requests ride ## Architecture ``` -┌─────────────────────────────────────────────────────┐ +┌──────────────────────────────────────────────────────┐ │ API Gateway │ │ Express REST API + Socket.io WebSocket Server │ │ Auth (JWT) • Rate Limiting • Zod Validation │ └──────────────┬──────────────────────┬────────────────┘ │ BullMQ Jobs │ Redis Pub/Sub ▼ ▼ -┌──────────────────────┐ ┌────────────────────┐ +┌───────────────────────┐ ┌────────────────────┐ │ Ride Worker │ │ Notifications │ │ Matching • Lifecycle │ │ Socket Adapter │ │ State Machine • OTP │ │ Cross-process │ -└──────────┬───────────┘ └────────────────────┘ +└──────────┬────────────┘ └────────────────────┘ │ ┌─────┴─────┐ ▼ ▼ From 04c88d6941a4a4f791a296911d25094a695997a9 Mon Sep 17 00:00:00 2001 From: Risshi25 Date: Sat, 7 Mar 2026 14:11:42 +0530 Subject: [PATCH 4/4] fix: Address PR #17 review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - GEOPOS null check: verify both coords are non-null before parsing (Redis returns [null,null] for missing members → was producing NaN) - handleComplete idempotent: skip validation + DB update on retry when trip is already COMPLETED, still run cleanup/notifications - Markdown fence labels: add 'text' to ASCII diagram code blocks - OTP security: strip OTP from driver-facing APIs (already applied) - authorize('rider'): rider current-ride route (already applied) --- apps/api-gateway/src/routes/user.routes.ts | 2 +- apps/api-gateway/src/services/ride.service.ts | 13 ++++--- .../src/workers/ride-lifecycle.worker.ts | 34 ++++++++++++------- learn/m5-tracking-completion.md | 6 ++-- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/apps/api-gateway/src/routes/user.routes.ts b/apps/api-gateway/src/routes/user.routes.ts index 3fc6bc9..3ad9fde 100644 --- a/apps/api-gateway/src/routes/user.routes.ts +++ b/apps/api-gateway/src/routes/user.routes.ts @@ -170,7 +170,7 @@ userRouter.get('/driver/current-ride', authenticate, authorize('driver'), async * @desc Rider's current active ride (REQUESTED, ACCEPTED, or STARTED) * @access Private */ -userRouter.get('/rider/current-ride', authenticate, async (req: Request, res: Response) => { +userRouter.get('/rider/current-ride', authenticate, authorize('rider'), async (req: Request, res: Response) => { try { if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' }); const result = await getRiderCurrentRide(req.user.userId); diff --git a/apps/api-gateway/src/services/ride.service.ts b/apps/api-gateway/src/services/ride.service.ts index 1e977f0..fca1b77 100644 --- a/apps/api-gateway/src/services/ride.service.ts +++ b/apps/api-gateway/src/services/ride.service.ts @@ -470,6 +470,11 @@ export const getRideDetail = async (userId: string, tripId: string): Promise if (tripData.status === 'STARTED') { try { const pos = await redis.geopos('drivers:active', driverRecord.userId); - if (pos && pos[0]) { + const coords = pos?.[0]; + if (coords?.[0] != null && coords?.[1] != null) { tripData.driverLocation = { - lng: parseFloat(pos[0][0] as string), - lat: parseFloat(pos[0][1] as string), + lng: parseFloat(coords[0] as string), + lat: parseFloat(coords[1] as string), }; } } catch { diff --git a/apps/ride-worker/src/workers/ride-lifecycle.worker.ts b/apps/ride-worker/src/workers/ride-lifecycle.worker.ts index 8773ed5..407055b 100644 --- a/apps/ride-worker/src/workers/ride-lifecycle.worker.ts +++ b/apps/ride-worker/src/workers/ride-lifecycle.worker.ts @@ -272,7 +272,15 @@ const handleComplete = async (data: RideLifecycleJobData): Promise => { const trip = await prisma.trip.findUnique({ where: { id: tripId } }); if (!trip) throw new Error(`Trip ${tripId} not found`); - validateTransition(trip.status, 'COMPLETED'); + + // Idempotent: if already COMPLETED (e.g. BullMQ retry after partial failure), + // skip validation + DB update but still run cleanup/notifications + const alreadyCompleted = trip.status === 'COMPLETED'; + if (!alreadyCompleted) { + validateTransition(trip.status, 'COMPLETED'); + } + + const completedAt = trip.completedAt ?? new Date(); // 1. Calculate distance using PostGIS ST_DistanceSphere const distanceResult = await prisma.$queryRaw<{ distance_meters: number }[]>` @@ -290,16 +298,18 @@ const handleComplete = async (data: RideLifecycleJobData): Promise => { const calculatedFare = BASE_FARE + distanceKm * PER_KM_RATE; const fare = Math.round(Math.max(MIN_FARE, calculatedFare) * 100) / 100; - // 3. Update Trip: COMPLETED, fare, distance, completedAt - await prisma.trip.update({ - where: { id: tripId }, - data: { - status: 'COMPLETED', - fare, - distance: Math.round(distanceKm * 100) / 100, - completedAt: new Date(), - }, - }); + // 3. Update Trip: COMPLETED, fare, distance, completedAt (skip if already done) + if (!alreadyCompleted) { + await prisma.trip.update({ + where: { id: tripId }, + data: { + status: 'COMPLETED', + fare, + distance: Math.round(distanceKm * 100) / 100, + completedAt, + }, + }); + } // 4. Remove driver from busy set if (trip.driverId) { @@ -330,7 +340,7 @@ const handleComplete = async (data: RideLifecycleJobData): Promise => { distanceKm: Math.round(distanceKm * 100) / 100, pickupLocation: locations[0]?.pickup ?? '', dropoffLocation: locations[0]?.dropoff ?? '', - completedAt: new Date().toISOString(), + completedAt: completedAt.toISOString(), }); // Also notify rider personal room diff --git a/learn/m5-tracking-completion.md b/learn/m5-tracking-completion.md index fce7305..1862228 100644 --- a/learn/m5-tracking-completion.md +++ b/learn/m5-tracking-completion.md @@ -12,7 +12,7 @@ M5 completes the end-to-end ride-sharing flow. After a ride is STARTED (M4), the ## Throttled Location Broadcast -``` +```text Driver GPS update (every ~1s) │ ▼ @@ -44,7 +44,7 @@ Without throttling, a driver sending GPS every second = 60 PostGIS queries/minut ## Ride Completion — COMPLETE Action -``` +```text POST /rides/:tripId/complete (driver) │ ▼ @@ -111,7 +111,7 @@ This avoids DB queries and gives real-time position. ## Complete Flow (M1–M5) -``` +```text 1. Rider: POST /rides/create → REQUESTED 2. Worker: cascade match → nearest driver → ride:offer 3. Driver: driver:accept-ride → SETNX lock → ACCEPTED