Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 190 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,194 @@
<div align="center">

# 🚀 UrbanPulse

**Production-grade ride-sharing backend** built with a distributed microservices architecture.

*Real-time driver matching • Live GPS tracking • OTP-verified pickups • PostGIS fare calculation*

[![TypeScript](https://img.shields.io/badge/TypeScript-5.x-3178C6?logo=typescript&logoColor=white)](https://www.typescriptlang.org/)
[![Node.js](https://img.shields.io/badge/Node.js-20+-339933?logo=node.js&logoColor=white)](https://nodejs.org/)
[![PostgreSQL](https://img.shields.io/badge/PostgreSQL-PostGIS-4169E1?logo=postgresql&logoColor=white)](https://www.postgresql.org/)
[![Redis](https://img.shields.io/badge/Redis-GEO%20%2B%20Pub%2FSub-DC382D?logo=redis&logoColor=white)](https://redis.io/)
[![Socket.io](https://img.shields.io/badge/Socket.io-Realtime-010101?logo=socket.io&logoColor=white)](https://socket.io/)

</div>

---

## What is Urban Pulse ?

UrbanPulse is a **full ride-sharing backend** , think of it as the engine behind apps like Uber/Ola. A rider requests a ride, the system finds the nearest available driver, handles the entire lifecycle through completion, and calculates the fare , all in real-time.

It's a distributed system with **race condition protection**, **cascading driver matching**, **cross-process event emission**, and **geospatial queries** — the same patterns used in production ride-sharing platforms.

---

## The Ride Flow

```
Rider requests ride
Nearest driver found (Redis GEOSEARCH, 5km radius)
Offer sent via WebSocket → 30s timeout → cascade to next driver
Driver accepts (distributed SETNX lock prevents double-accept)
4-digit OTP generated → sent to rider
Driver verifies OTP at pickup → ride starts
Live GPS streaming (throttled, with ETA) → rider sees driver moving
Driver completes ride → PostGIS distance → fare calculated
Both see ride in history with full details
```

---

## Architecture

```
┌──────────────────────────────────────────────────────┐
│ API Gateway │
│ Express REST API + Socket.io WebSocket Server │
│ Auth (JWT) • Rate Limiting • Zod Validation │
└──────────────┬──────────────────────┬────────────────┘
│ BullMQ Jobs │ Redis Pub/Sub
▼ ▼
┌───────────────────────┐ ┌────────────────────┐
│ Ride Worker │ │ Notifications │
│ Matching • Lifecycle │ │ Socket Adapter │
│ State Machine • OTP │ │ Cross-process │
└──────────┬────────────┘ └────────────────────┘
┌─────┴─────┐
▼ ▼
┌─────────┐ ┌─────────┐
│ Postgres│ │ Redis │
│ PostGIS │ │ GEO+Pub │
└─────────┘ └─────────┘
```

**Monorepo** (Turborepo + pnpm workspaces) with 4 packages:

| Package | Role |
|---------|------|
| `api-gateway` | REST API + WebSocket server |
| `ride-worker` | Background job processing (BullMQ) |
| `common` | Shared Prisma schema, Zod schemas, constants |
| `notifications` | Socket.io Redis adapter for cross-process events |

---

## Key Engineering Decisions

### 🗺️ Why Redis GEO over SQL for driver lookup?
`GEOSEARCH` returns drivers sorted by distance in **O(log N + M)** — orders of magnitude faster than a PostGIS query scanning the drivers table. Drivers update their position every few seconds; Redis handles this write-heavy workload without touching the database.

### 🔒 Why SETNX for ride acceptance?
Two drivers could accept the same ride simultaneously. `SETNX` (set-if-not-exists) acts as a **distributed lock** — the first driver wins atomically, the second gets a clean rejection. No database race conditions.

### ⚡ Why BullMQ instead of direct processing?
Ride matching involves multiple network calls (Redis lookup → filter → DB write → WebSocket emit → schedule timeout). If any step fails, BullMQ **retries automatically**. The cascade timeout (30s per driver) uses BullMQ's delayed jobs — no cron needed.

### 🚦 Why throttle location updates?
A driver's phone sends GPS every ~1 second. Without throttling, that's 60 PostGIS queries + 60 WebSocket broadcasts per minute per active ride. The **SETNX 3-second throttle** cuts this to 20/minute while still updating Redis GEO position on every tick.

---

## Tech Stack

| Layer | Technology | Why |
|-------|-----------|-----|
| Language | TypeScript | End-to-end type safety across all packages |
| API | Express.js | Lightweight, middleware-driven |
| Realtime | Socket.io + Redis Adapter | Cross-process WebSocket events |
| Database | PostgreSQL + PostGIS | Geospatial queries (distance, points) |
| ORM | Prisma | Type-safe DB access + migrations |
| Cache/Geo | Redis | GEO commands, pub/sub, distributed locks |
| Queue | BullMQ | Reliable job processing with retries |
| Validation | Zod | Runtime schema validation |
| Auth | JWT + bcrypt | Stateless authentication |
| Monorepo | Turborepo + pnpm | Shared packages, parallel builds |
| Testing | Vitest | Fast unit tests (76 passing) |
| Infra | Docker Compose | One-command dev environment |

---

## API Highlights

<details>
<summary><b>Authentication</b></summary>

- `POST /auth/register` — rider or driver registration
- `POST /auth/login` — JWT token issuance
- `GET /user/profile` — authenticated user profile
</details>

<details>
<summary><b>Ride Lifecycle</b></summary>

- `POST /rides/create` — request a ride (triggers matching)
- `POST /rides/:tripId/accept` — driver accepts (SETNX lock)
- `POST /rides/:tripId/verify-otp` — OTP verification at pickup
- `POST /rides/:tripId/complete` — complete ride (fare calculation)
- `PATCH /rides/cancel` — cancel ride
</details>

<details>
<summary><b>Dashboard</b></summary>

- `GET /rides/history` — paginated ride history
- `GET /rides/:tripId` — ride details
- `GET /user/driver/stats` — total rides, earnings, distance
- `GET /user/driver/current-ride` — active ride + rider info
- `GET /user/rider/current-ride` — active ride + driver location
</details>

<details>
<summary><b>WebSocket Events</b></summary>

- `ride:offer` → driver receives ride offer
- `ride:accepted` → rider notified of match
- `ride:otp` → rider receives pickup OTP
- `ride:driver-location` → live tracking with ETA
- `ride:completed` → fare breakdown
</details>

---

## Quick Start

```bash
# Clone and start infrastructure
git clone https://github.com/codeRisshi25/urbanpulse-backend.git
cd urbanpulse-backend
cp .env.example .env

# Start PostgreSQL + Redis
docker compose up -d

# Install and run
pnpm install
pnpm run dev
```

---

## Testing

```bash
source activate.sh
dcu
# for psql client
dc
psql -h host -U user -d db
pnpm test # 76 tests across all packages
pnpm typecheck # TypeScript validation
```

# small dev log
A Postman collection (`UrbanPulse_Postman_Collection.json`) is included for manual API testing.

---

- postgres migration on check
- turbo repo
<div align="center">
<sub>Built as a systems design exercise in distributed real-time architectures.</sub>
</div>
109 changes: 53 additions & 56 deletions apps/api-gateway/src/routes/ride.routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Router } from 'express';
import { validate } from '../middleware/validate.js';
import { rideSchema, otpVerifySchema, rideCancelSchema } from 'common';
import { rideSchema, otpVerifySchema, rideCancelSchema, rideHistoryQuerySchema, tripIdParamSchema } from 'common';
import { authenticate, authorize } from '../middleware/auth.js';
import {
createRide,
Expand All @@ -9,6 +9,9 @@ import {
rejectRide,
verifyOtp,
driverCancelRide,
completeRide,
getRideHistory,
getRideDetail,
} from '../services/ride.service.js';
import { getNearbyAvailableRides } from '../services/driver.service.js';

Expand All @@ -18,119 +21,113 @@ const rideRouter: Router = Router();

rideRouter.post('/create', authenticate, validate(rideSchema), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const ride = await createRide(req.body, req.user.userId);
if (!ride.success) return res.status(400).json(ride);
return res.status(201).json(ride);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.patch('/cancel', authenticate, validate(rideCancelSchema), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const { tripId } = req.body;
const ride = await cancelRide(req.user.userId, tripId);
if (!ride.success) return res.status(400).json(ride);
return res.status(200).json(ride);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

// ─── Ride history & detail ───────────────────────────────────────────────

rideRouter.get('/history', authenticate, validate(rideHistoryQuerySchema), async (req, res) => {
try {
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const { page, limit } = req.query as unknown as { page: number; limit: number };
const result = await getRideHistory(req.user.userId, req.user.role, page || 1, limit || 20);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.get('/:tripId', authenticate, validate(tripIdParamSchema), async (req, res) => {
try {
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const result = await getRideDetail(req.user.userId, req.params.tripId);
return res.status(result.success ? 200 : (result.message.includes('access') ? 403 : 400)).json(result);
} catch (error) {
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

// ─── Driver endpoints ────────────────────────────────────────────────────

rideRouter.get('/available', authenticate, authorize('driver'), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const result = await getNearbyAvailableRides(req.user.userId);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.post('/:tripId/accept', authenticate, authorize('driver'), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const { offerId } = req.body;
if (!offerId) {
return res.status(400).json({ success: false, message: 'offerId is required' });
}
if (!offerId) return res.status(400).json({ success: false, message: 'offerId is required' });
const result = await acceptRide(req.user.userId, req.params.tripId, offerId);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.post('/:tripId/reject', authenticate, authorize('driver'), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const { offerId } = req.body;
if (!offerId) {
return res.status(400).json({ success: false, message: 'offerId is required' });
}
if (!offerId) return res.status(400).json({ success: false, message: 'offerId is required' });
const result = await rejectRide(req.user.userId, req.params.tripId, offerId);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.post('/:tripId/verify-otp', authenticate, authorize('driver'), validate(otpVerifySchema), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const result = await verifyOtp(req.user.userId, req.params.tripId, req.body.otp);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.post('/:tripId/complete', authenticate, authorize('driver'), async (req, res) => {
try {
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const result = await completeRide(req.user.userId, req.params.tripId);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

rideRouter.post('/:tripId/driver-cancel', authenticate, authorize('driver'), async (req, res) => {
try {
if (!req.user) {
return res.status(401).json({ success: false, message: 'Unauthorized' });
}
if (!req.user) return res.status(401).json({ success: false, message: 'Unauthorized' });
const result = await driverCancelRide(req.user.userId, req.params.tripId);
return res.status(result.success ? 200 : 400).json(result);
} catch (error) {
return res.status(500).json({
success: false,
message: error instanceof Error ? error.message : 'Internal server error',
});
return res.status(500).json({ success: false, message: error instanceof Error ? error.message : 'Internal server error' });
}
});

Expand Down
Loading