From e6fe0e26bf436417a19c46515f97ad5ea0557a9e Mon Sep 17 00:00:00 2001 From: yuliuyi717-ux <264093635+yuliuyi717-ux@users.noreply.github.com> Date: Tue, 10 Mar 2026 07:40:20 +0800 Subject: [PATCH] feat(backend): implement signed webhook event system --- AUTODEV_REPORT.md | 61 ++++ README.md | 24 ++ packages/backend/app/__init__.py | 52 ++- packages/backend/app/db/schema.sql | 31 ++ packages/backend/app/models.py | 46 +++ packages/backend/app/routes/__init__.py | 2 + packages/backend/app/routes/auth.py | 12 +- packages/backend/app/routes/bills.py | 96 +++++- packages/backend/app/routes/expenses.py | 19 +- packages/backend/app/routes/webhooks.py | 137 ++++++++ packages/backend/app/services/webhooks.py | 393 ++++++++++++++++++++++ packages/backend/tests/test_webhooks.py | 133 ++++++++ 12 files changed, 1002 insertions(+), 4 deletions(-) create mode 100644 AUTODEV_REPORT.md create mode 100644 packages/backend/app/routes/webhooks.py create mode 100644 packages/backend/app/services/webhooks.py create mode 100644 packages/backend/tests/test_webhooks.py diff --git a/AUTODEV_REPORT.md b/AUTODEV_REPORT.md new file mode 100644 index 0000000..f1ccead --- /dev/null +++ b/AUTODEV_REPORT.md @@ -0,0 +1,61 @@ +# AUTODEV Report + +## Issue +- Implemented: [Issue #77 - Webhook Event System](https://github.com/rohitdash08/FinMind/issues/77) + +## Changed Files +- `README.md` +- `packages/backend/app/__init__.py` +- `packages/backend/app/db/schema.sql` +- `packages/backend/app/models.py` +- `packages/backend/app/routes/__init__.py` +- `packages/backend/app/routes/auth.py` +- `packages/backend/app/routes/bills.py` +- `packages/backend/app/routes/expenses.py` +- `packages/backend/app/routes/webhooks.py` (new) +- `packages/backend/app/services/webhooks.py` (new) +- `packages/backend/tests/test_webhooks.py` (new) + +## What Was Implemented +- Added webhook domain models: + - `WebhookTarget` + - `WebhookDelivery` + - `WebhookEvent` enum (9 documented event types) +- Added webhook service with: + - HMAC SHA-256 signed delivery (`X-FinMind-Signature` over `.`) + - Delivery logging and status tracking + - Retry/failure handling with exponential backoff (1 minute to max 1 hour, up to 7 retries) + - Event type catalog with descriptions and payload examples +- Added authenticated webhook routes: + - `POST/GET /webhooks/targets` + - `PATCH/DELETE /webhooks/targets/{id}` + - `GET /webhooks/deliveries` + - `POST /webhooks/deliveries/{id}/redeliver` + - `POST /webhooks/process-pending` + - `GET /webhooks/event-types` +- Integrated webhook triggers into key flows: + - Expenses: created/updated/deleted + - Bills: created/updated/deleted + - Profile: updated (`/auth/me` patch) +- Added PostgreSQL schema DDL for webhook tables/indexes and startup compatibility creation for existing Postgres deployments. +- Documented webhook endpoints, signature contract, retry policy, and event types in `README.md`. + +## Validation Commands +1. `cd packages/backend && ../../.venv/bin/python -m pytest -q tests/test_webhooks.py` +2. `sh ./scripts/test-backend.sh tests/test_webhooks.py` +3. `sh ./scripts/test-backend.sh tests/test_auth.py tests/test_expenses.py tests/test_bills.py tests/test_reminders.py tests/test_observability.py` +4. `cd packages/backend && ../../.venv/bin/flake8 app/__init__.py app/routes/auth.py app/routes/bills.py app/routes/expenses.py app/routes/webhooks.py app/services/webhooks.py app/models.py tests/test_webhooks.py` +5. `sh ./scripts/test-backend.sh` + +## Validation Results +- Command 1: failed in local host context due missing `redis` hostname resolution (environment mismatch, not code failure). +- Command 2: passed (`3 passed`). +- Command 3: passed (`16 passed`). +- Command 4: passed (no lint errors on changed backend files). +- Command 5: passed (`25 passed`). + +## Risks / Follow-ups +- Webhook delivery processing is synchronous on event trigger (and manual process endpoint), so slow/unstable webhook targets can increase API latency. +- Retry processing is currently request-driven/manual (`trigger_event` and `/webhooks/process-pending`); there is no dedicated background worker/cron loop in this change. +- Webhook secrets are stored in plaintext in DB (common but sensitive); encryption-at-rest or secret vault integration would improve security posture. +- `bill.due` and `subscription.updated` are documented and supported as event types, but no automatic emitters were added yet because current code paths do not include dedicated due/subscription transition jobs. diff --git a/README.md b/README.md index 49592bf..c06511f 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,30 @@ OpenAPI: `backend/app/openapi.yaml` - Bills: CRUD `/bills`, pay/mark `/bills/{id}/pay` - Reminders: CRUD `/reminders`, trigger `/reminders/run` - Insights: `/insights/monthly`, `/insights/budget-suggestion` +- Webhooks: + - targets: `POST/GET /webhooks/targets`, `PATCH/DELETE /webhooks/targets/{id}` + - deliveries: `GET /webhooks/deliveries`, `POST /webhooks/deliveries/{id}/redeliver` + - event catalog: `GET /webhooks/event-types` + +### Webhook Event Types +- `expense.created` +- `expense.updated` +- `expense.deleted` +- `bill.created` +- `bill.updated` +- `bill.deleted` +- `bill.due` +- `subscription.updated` +- `profile.updated` + +Each webhook request is signed with HMAC SHA-256 and includes: +- `X-FinMind-Event` +- `X-FinMind-Timestamp` +- `X-FinMind-Signature` (`sha256=`, signed over `.`) + +Delivery retry policy: +- exponential backoff starting at 1 minute and capped at 1 hour +- up to 7 retries before marking a delivery as failed ## MVP UI/UX Plan - Auth screens: register/login. diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b4..3357922 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -110,10 +110,60 @@ def _ensure_schema_compatibility(app: Flask) -> None: NOT NULL DEFAULT 'INR' """ ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS webhook_targets ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + url VARCHAR(500) NOT NULL, + secret VARCHAR(255) NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + events JSONB NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() + ) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS webhook_deliveries ( + id SERIAL PRIMARY KEY, + target_id INT NOT NULL REFERENCES webhook_targets(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, + payload JSONB NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + attempt_count INT NOT NULL DEFAULT 0, + last_attempt_at TIMESTAMP, + next_attempt_at TIMESTAMP, + response_status INT, + response_body TEXT, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() + ) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_webhook_targets_user + ON webhook_targets(user_id) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_target + ON webhook_deliveries(target_id) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status_next_attempt + ON webhook_deliveries(status, next_attempt_at) + """ + ) conn.commit() except Exception: app.logger.exception( - "Schema compatibility patch failed for users.preferred_currency" + "Schema compatibility patch failed for webhook schema" ) conn.rollback() finally: diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189d..5d0547f 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -123,3 +123,34 @@ CREATE TABLE IF NOT EXISTS audit_logs ( action VARCHAR(100) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); + +CREATE TABLE IF NOT EXISTS webhook_targets ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + url VARCHAR(500) NOT NULL, + secret VARCHAR(255) NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + events JSONB NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS webhook_deliveries ( + id SERIAL PRIMARY KEY, + target_id INT NOT NULL REFERENCES webhook_targets(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, + payload JSONB NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + attempt_count INT NOT NULL DEFAULT 0, + last_attempt_at TIMESTAMP, + next_attempt_at TIMESTAMP, + response_status INT, + response_body TEXT, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_webhook_targets_user ON webhook_targets(user_id); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_target ON webhook_deliveries(target_id); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status_next_attempt + ON webhook_deliveries(status, next_attempt_at); diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d4481..8eca16c 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,49 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class WebhookEvent(str, Enum): + EXPENSE_CREATED = "expense.created" + EXPENSE_UPDATED = "expense.updated" + EXPENSE_DELETED = "expense.deleted" + BILL_CREATED = "bill.created" + BILL_UPDATED = "bill.updated" + BILL_DELETED = "bill.deleted" + BILL_DUE = "bill.due" + SUBSCRIPTION_UPDATED = "subscription.updated" + PROFILE_UPDATED = "profile.updated" + + +class WebhookTarget(db.Model): + __tablename__ = "webhook_targets" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + url = db.Column(db.String(500), nullable=False) + secret = db.Column(db.String(255), nullable=False) + enabled = db.Column(db.Boolean, default=True, nullable=False) + events = db.Column(db.JSON, nullable=False) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + updated_at = db.Column( + db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) + + +class WebhookDelivery(db.Model): + __tablename__ = "webhook_deliveries" + id = db.Column(db.Integer, primary_key=True) + target_id = db.Column( + db.Integer, db.ForeignKey("webhook_targets.id"), nullable=False + ) + event_type = db.Column(db.String(50), nullable=False) + payload = db.Column(db.JSON, nullable=False) + status = db.Column(db.String(20), default="pending", nullable=False) + attempt_count = db.Column(db.Integer, default=0, nullable=False) + last_attempt_at = db.Column(db.DateTime, nullable=True) + next_attempt_at = db.Column(db.DateTime, nullable=True) + response_status = db.Column(db.Integer, nullable=True) + response_body = db.Column(db.Text, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + updated_at = db.Column( + db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f8..75b9577 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .webhooks import bp as webhooks_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(webhooks_bp, url_prefix="/webhooks") diff --git a/packages/backend/app/routes/auth.py b/packages/backend/app/routes/auth.py index 05a3937..6393800 100644 --- a/packages/backend/app/routes/auth.py +++ b/packages/backend/app/routes/auth.py @@ -9,7 +9,8 @@ get_jwt_identity, ) from ..extensions import db, redis_client -from ..models import User +from ..models import User, WebhookEvent +from ..services.webhooks import WebhookService import logging import time @@ -94,6 +95,15 @@ def update_me(): return jsonify(error="unsupported preferred_currency"), 400 user.preferred_currency = cur db.session.commit() + WebhookService.trigger_event( + WebhookEvent.PROFILE_UPDATED, + { + "id": user.id, + "email": user.email, + "preferred_currency": user.preferred_currency or "INR", + }, + user_id=uid, + ) return jsonify( id=user.id, email=user.email, diff --git a/packages/backend/app/routes/bills.py b/packages/backend/app/routes/bills.py index f557e90..b116aaa 100644 --- a/packages/backend/app/routes/bills.py +++ b/packages/backend/app/routes/bills.py @@ -2,8 +2,9 @@ from flask import Blueprint, jsonify, request from flask_jwt_extended import jwt_required, get_jwt_identity from ..extensions import db -from ..models import Bill, BillCadence, User +from ..models import Bill, BillCadence, User, WebhookEvent from ..services.cache import cache_delete_patterns +from ..services.webhooks import WebhookService import logging bp = Blueprint("bills", __name__) @@ -59,6 +60,11 @@ def create_bill(): db.session.add(b) db.session.commit() logger.info("Created bill id=%s user=%s name=%s", b.id, uid, b.name) + WebhookService.trigger_event( + WebhookEvent.BILL_CREATED, + _bill_to_dict(b), + user_id=uid, + ) cache_delete_patterns( [f"user:{uid}:upcoming_bills*", f"user:{uid}:dashboard_summary:*"] ) @@ -85,7 +91,95 @@ def mark_paid(bill_id: int): cache_delete_patterns( [f"user:{uid}:upcoming_bills*", f"user:{uid}:dashboard_summary:*"] ) + WebhookService.trigger_event( + WebhookEvent.BILL_UPDATED, + _bill_to_dict(b), + user_id=uid, + ) logger.info( "Marked bill paid id=%s user=%s next_due_date=%s", b.id, uid, b.next_due_date ) return jsonify(message="updated") + + +@bp.patch("/") +@jwt_required() +def update_bill(bill_id: int): + uid = int(get_jwt_identity()) + b = db.session.get(Bill, bill_id) + if not b or b.user_id != uid: + return jsonify(error="not found"), 404 + + data = request.get_json() or {} + if "name" in data: + b.name = str(data.get("name") or "").strip() or b.name + if "amount" in data: + b.amount = data.get("amount") + if "currency" in data: + b.currency = str(data.get("currency") or "INR")[:10] + if "next_due_date" in data: + try: + b.next_due_date = date.fromisoformat(str(data.get("next_due_date"))) + except ValueError: + return jsonify(error="invalid next_due_date"), 400 + if "cadence" in data: + try: + b.cadence = BillCadence(str(data.get("cadence"))) + except ValueError: + return jsonify(error="invalid cadence"), 400 + if "autopay_enabled" in data: + b.autopay_enabled = bool(data.get("autopay_enabled")) + if "channel_whatsapp" in data: + b.channel_whatsapp = bool(data.get("channel_whatsapp")) + if "channel_email" in data: + b.channel_email = bool(data.get("channel_email")) + if "active" in data: + b.active = bool(data.get("active")) + + db.session.commit() + cache_delete_patterns( + [f"user:{uid}:upcoming_bills*", f"user:{uid}:dashboard_summary:*"] + ) + WebhookService.trigger_event( + WebhookEvent.BILL_UPDATED, + _bill_to_dict(b), + user_id=uid, + ) + return jsonify(_bill_to_dict(b)), 200 + + +@bp.delete("/") +@jwt_required() +def delete_bill(bill_id: int): + uid = int(get_jwt_identity()) + b = db.session.get(Bill, bill_id) + if not b or b.user_id != uid: + return jsonify(error="not found"), 404 + + payload = {"id": b.id} + db.session.delete(b) + db.session.commit() + cache_delete_patterns( + [f"user:{uid}:upcoming_bills*", f"user:{uid}:dashboard_summary:*"] + ) + WebhookService.trigger_event( + WebhookEvent.BILL_DELETED, + payload, + user_id=uid, + ) + return jsonify(message="deleted"), 200 + + +def _bill_to_dict(bill: Bill) -> dict: + return { + "id": bill.id, + "name": bill.name, + "amount": float(bill.amount), + "currency": bill.currency, + "next_due_date": bill.next_due_date.isoformat(), + "cadence": bill.cadence.value, + "autopay_enabled": bill.autopay_enabled, + "channel_whatsapp": bill.channel_whatsapp, + "channel_email": bill.channel_email, + "active": bill.active, + } diff --git a/packages/backend/app/routes/expenses.py b/packages/backend/app/routes/expenses.py index 1376d46..978a452 100644 --- a/packages/backend/app/routes/expenses.py +++ b/packages/backend/app/routes/expenses.py @@ -5,9 +5,10 @@ from flask import Blueprint, current_app, jsonify, request from flask_jwt_extended import jwt_required, get_jwt_identity from ..extensions import db -from ..models import Expense, RecurringCadence, RecurringExpense, User +from ..models import Expense, RecurringCadence, RecurringExpense, User, WebhookEvent from ..services.cache import cache_delete_patterns, monthly_summary_key from ..services import expense_import +from ..services.webhooks import WebhookService import logging bp = Blueprint("expenses", __name__) @@ -77,6 +78,11 @@ def create_expense(): db.session.add(e) db.session.commit() logger.info("Created expense id=%s user=%s amount=%s", e.id, uid, e.amount) + WebhookService.trigger_event( + WebhookEvent.EXPENSE_CREATED, + _expense_to_dict(e), + user_id=uid, + ) # Invalidate caches cache_delete_patterns( [ @@ -230,6 +236,11 @@ def update_expense(expense_id: int): raw_date = data.get("date") or data.get("spent_at") e.spent_at = date.fromisoformat(raw_date) db.session.commit() + WebhookService.trigger_event( + WebhookEvent.EXPENSE_UPDATED, + _expense_to_dict(e), + user_id=uid, + ) _invalidate_expense_cache(uid, e.spent_at.isoformat()) return jsonify(_expense_to_dict(e)) @@ -242,8 +253,14 @@ def delete_expense(expense_id: int): if not e or e.user_id != uid: return jsonify(error="not found"), 404 spent_at = e.spent_at.isoformat() + expense_data = _expense_to_dict(e) db.session.delete(e) db.session.commit() + WebhookService.trigger_event( + WebhookEvent.EXPENSE_DELETED, + expense_data, + user_id=uid, + ) _invalidate_expense_cache(uid, spent_at) return jsonify(message="deleted") diff --git a/packages/backend/app/routes/webhooks.py b/packages/backend/app/routes/webhooks.py new file mode 100644 index 0000000..d20d0bb --- /dev/null +++ b/packages/backend/app/routes/webhooks.py @@ -0,0 +1,137 @@ +from flask import Blueprint, jsonify, request +from flask_jwt_extended import get_jwt_identity, jwt_required + +from ..services.webhooks import WebhookService + +bp = Blueprint("webhooks", __name__) + + +@bp.post("/targets") +@jwt_required() +def create_webhook_target(): + uid = int(get_jwt_identity()) + data = request.get_json(silent=True) or {} + + try: + target = WebhookService.create_target( + user_id=uid, + url=str(data.get("url") or "").strip(), + secret=str(data.get("secret") or ""), + events=data.get("events"), + ) + except ValueError as exc: + return jsonify(error=str(exc)), 400 + + return jsonify(_target_to_dict(target)), 201 + + +@bp.get("/targets") +@jwt_required() +def get_webhook_targets(): + uid = int(get_jwt_identity()) + targets = WebhookService.get_targets(user_id=uid) + return jsonify([_target_to_dict(target) for target in targets]), 200 + + +@bp.patch("/targets/") +@jwt_required() +def update_webhook_target(target_id: int): + uid = int(get_jwt_identity()) + data = request.get_json(silent=True) or {} + + try: + target = WebhookService.update_target( + target_id=target_id, + user_id=uid, + url=data.get("url"), + secret=data.get("secret"), + events=data.get("events"), + enabled=data.get("enabled"), + ) + except ValueError as exc: + return jsonify(error=str(exc)), 400 + + if not target: + return jsonify(error="not found"), 404 + return jsonify(_target_to_dict(target)), 200 + + +@bp.delete("/targets/") +@jwt_required() +def delete_webhook_target(target_id: int): + uid = int(get_jwt_identity()) + deleted = WebhookService.delete_target(target_id=target_id, user_id=uid) + if not deleted: + return jsonify(error="not found"), 404 + return ("", 204) + + +@bp.get("/deliveries") +@jwt_required() +def get_webhook_deliveries(): + uid = int(get_jwt_identity()) + target_id = request.args.get("target_id") + parsed_target_id: int | None = None + if target_id is not None: + try: + parsed_target_id = int(target_id) + except ValueError: + return jsonify(error="target_id must be an integer"), 400 + + deliveries = WebhookService.get_deliveries(user_id=uid, target_id=parsed_target_id) + return jsonify([_delivery_to_dict(item) for item in deliveries]), 200 + + +@bp.post("/deliveries//redeliver") +@jwt_required() +def redeliver_webhook(delivery_id: int): + uid = int(get_jwt_identity()) + delivery = WebhookService.redeliver(delivery_id=delivery_id, user_id=uid) + if not delivery: + return jsonify(error="not found"), 404 + return jsonify(_delivery_to_dict(delivery)), 200 + + +@bp.post("/process-pending") +@jwt_required() +def process_pending(): + processed = WebhookService.process_pending_deliveries() + return jsonify(processed=processed), 200 + + +@bp.get("/event-types") +@jwt_required() +def event_types(): + return jsonify(WebhookService.event_types_catalog()), 200 + + +def _target_to_dict(target) -> dict: + return { + "id": target.id, + "url": target.url, + "events": target.events, + "enabled": target.enabled, + "created_at": target.created_at.isoformat(), + "updated_at": target.updated_at.isoformat(), + } + + +def _delivery_to_dict(delivery) -> dict: + return { + "id": delivery.id, + "target_id": delivery.target_id, + "event_type": delivery.event_type, + "payload": delivery.payload, + "status": delivery.status, + "attempt_count": delivery.attempt_count, + "last_attempt_at": ( + delivery.last_attempt_at.isoformat() if delivery.last_attempt_at else None + ), + "next_attempt_at": ( + delivery.next_attempt_at.isoformat() if delivery.next_attempt_at else None + ), + "response_status": delivery.response_status, + "response_body": delivery.response_body, + "created_at": delivery.created_at.isoformat(), + "updated_at": delivery.updated_at.isoformat(), + } diff --git a/packages/backend/app/services/webhooks.py b/packages/backend/app/services/webhooks.py new file mode 100644 index 0000000..9100f58 --- /dev/null +++ b/packages/backend/app/services/webhooks.py @@ -0,0 +1,393 @@ +import hashlib +import hmac +import json +import time +from datetime import datetime, timedelta +from urllib.parse import urlparse + +import requests +from sqlalchemy import or_ + +from ..extensions import db +from ..models import WebhookDelivery, WebhookEvent, WebhookTarget + + +class WebhookService: + MAX_RETRIES = 7 + REQUEST_TIMEOUT_SECONDS = 10 + SUCCESS_STATUS_CODES = {200, 201, 202, 204} + + _EVENT_DESCRIPTIONS = { + WebhookEvent.EXPENSE_CREATED.value: "Triggered when a new expense is created.", + WebhookEvent.EXPENSE_UPDATED.value: "Triggered when an expense is updated.", + WebhookEvent.EXPENSE_DELETED.value: "Triggered when an expense is deleted.", + WebhookEvent.BILL_CREATED.value: "Triggered when a new bill is created.", + WebhookEvent.BILL_UPDATED.value: "Triggered when a bill is updated.", + WebhookEvent.BILL_DELETED.value: "Triggered when a bill is deleted.", + WebhookEvent.BILL_DUE.value: "Triggered when a bill becomes due.", + WebhookEvent.SUBSCRIPTION_UPDATED.value: ( + "Triggered when a subscription changes." + ), + WebhookEvent.PROFILE_UPDATED.value: "Triggered when a user profile changes.", + } + + _EVENT_EXAMPLES = { + WebhookEvent.EXPENSE_CREATED.value: { + "id": 123, + "amount": 42.5, + "currency": "INR", + "category_id": 3, + "expense_type": "EXPENSE", + "description": "Lunch", + "date": "2026-03-01", + }, + WebhookEvent.EXPENSE_UPDATED.value: { + "id": 123, + "amount": 45.0, + "currency": "INR", + "category_id": 3, + "expense_type": "EXPENSE", + "description": "Lunch (updated)", + "date": "2026-03-01", + }, + WebhookEvent.EXPENSE_DELETED.value: { + "id": 123, + "amount": 45.0, + "currency": "INR", + "category_id": 3, + "expense_type": "EXPENSE", + "description": "Lunch (updated)", + "date": "2026-03-01", + }, + WebhookEvent.BILL_CREATED.value: { + "id": 101, + "name": "Electricity", + "amount": 1500.0, + "currency": "INR", + "next_due_date": "2026-03-28", + "cadence": "MONTHLY", + "autopay_enabled": False, + "channel_whatsapp": False, + "channel_email": True, + }, + WebhookEvent.BILL_UPDATED.value: { + "id": 101, + "name": "Electricity", + "amount": 1600.0, + "currency": "INR", + "next_due_date": "2026-04-28", + "cadence": "MONTHLY", + "autopay_enabled": True, + "channel_whatsapp": False, + "channel_email": True, + "active": True, + }, + WebhookEvent.BILL_DELETED.value: {"id": 101}, + WebhookEvent.BILL_DUE.value: { + "id": 101, + "name": "Electricity", + "amount": 1600.0, + "currency": "INR", + "due_date": "2026-04-28", + }, + WebhookEvent.SUBSCRIPTION_UPDATED.value: { + "id": 5, + "plan_id": 2, + "active": True, + "started_at": "2026-01-01T00:00:00Z", + }, + WebhookEvent.PROFILE_UPDATED.value: { + "id": 42, + "email": "user@example.com", + "preferred_currency": "INR", + }, + } + + @staticmethod + def generate_signature(secret: str, payload: str, timestamp: str) -> str: + message = f"{timestamp}.{payload}" + return hmac.new( + secret.encode("utf-8"), message.encode("utf-8"), hashlib.sha256 + ).hexdigest() + + @staticmethod + def event_types_catalog() -> list[dict]: + types = [] + for event in WebhookEvent: + types.append( + { + "type": event.value, + "description": WebhookService._EVENT_DESCRIPTIONS[event.value], + "payload_example": WebhookService._EVENT_EXAMPLES[event.value], + } + ) + return types + + @staticmethod + def create_target( + *, user_id: int, url: str, secret: str, events: list[str] | None + ) -> WebhookTarget: + WebhookService._validate_url(url) + if not secret: + raise ValueError("secret is required") + normalized_events = WebhookService._normalize_events(events) + + target = WebhookTarget( + user_id=user_id, + url=url, + secret=secret, + events=normalized_events, + enabled=True, + ) + db.session.add(target) + db.session.commit() + return target + + @staticmethod + def get_targets(*, user_id: int) -> list[WebhookTarget]: + return ( + WebhookTarget.query.filter_by(user_id=user_id) + .order_by(WebhookTarget.created_at.desc()) + .all() + ) + + @staticmethod + def update_target( + *, + target_id: int, + user_id: int, + url: str | None = None, + secret: str | None = None, + events: list[str] | None = None, + enabled: bool | None = None, + ) -> WebhookTarget | None: + target = WebhookTarget.query.filter_by(id=target_id, user_id=user_id).first() + if not target: + return None + + if url is not None: + WebhookService._validate_url(url) + target.url = url + if secret is not None: + if not secret: + raise ValueError("secret cannot be empty") + target.secret = secret + if events is not None: + target.events = WebhookService._normalize_events(events) + if enabled is not None: + target.enabled = bool(enabled) + + db.session.commit() + return target + + @staticmethod + def delete_target(*, target_id: int, user_id: int) -> bool: + target = WebhookTarget.query.filter_by(id=target_id, user_id=user_id).first() + if not target: + return False + db.session.delete(target) + db.session.commit() + return True + + @staticmethod + def get_deliveries( + *, user_id: int, target_id: int | None = None + ) -> list[WebhookDelivery]: + query = WebhookDelivery.query.join( + WebhookTarget, WebhookTarget.id == WebhookDelivery.target_id + ).filter(WebhookTarget.user_id == user_id) + if target_id is not None: + query = query.filter(WebhookDelivery.target_id == target_id) + return query.order_by(WebhookDelivery.created_at.desc()).all() + + @staticmethod + def trigger_event( + event_type: WebhookEvent | str, payload: dict, user_id: int | None = None + ) -> int: + event_name = WebhookService._normalize_event_name(event_type) + + query = WebhookTarget.query.filter_by(enabled=True) + if user_id is not None: + query = query.filter_by(user_id=user_id) + targets = query.all() + + envelope = { + "type": event_name, + "timestamp": int(time.time()), + "data": payload, + } + if user_id is not None: + envelope["user_id"] = user_id + + created = 0 + for target in targets: + if not WebhookService._target_subscribed(target, event_name): + continue + db.session.add( + WebhookDelivery( + target_id=target.id, + event_type=event_name, + payload=envelope, + status="pending", + attempt_count=0, + next_attempt_at=datetime.utcnow(), + ) + ) + created += 1 + + if created == 0: + return 0 + + db.session.commit() + WebhookService.process_pending_deliveries() + return created + + @staticmethod + def process_pending_deliveries(limit: int = 100) -> int: + now = datetime.utcnow() + pending = ( + WebhookDelivery.query.filter(WebhookDelivery.status == "pending") + .filter( + or_( + WebhookDelivery.next_attempt_at.is_(None), + WebhookDelivery.next_attempt_at <= now, + ) + ) + .order_by(WebhookDelivery.created_at.asc()) + .limit(limit) + .all() + ) + for delivery in pending: + WebhookService.attempt_delivery(delivery) + return len(pending) + + @staticmethod + def attempt_delivery(delivery: WebhookDelivery) -> None: + target = db.session.get(WebhookTarget, delivery.target_id) + now = datetime.utcnow() + + delivery.attempt_count += 1 + delivery.last_attempt_at = now + + if not target or not target.enabled: + delivery.status = "failed" + delivery.next_attempt_at = None + delivery.response_body = "target not found or disabled" + db.session.commit() + return + + payload_json = json.dumps( + delivery.payload, ensure_ascii=False, separators=(",", ":"), sort_keys=True + ) + timestamp = str(int(time.time())) + signature = WebhookService.generate_signature( + target.secret, payload_json, timestamp + ) + + headers = { + "Content-Type": "application/json", + "X-FinMind-Event": delivery.event_type, + "X-FinMind-Timestamp": timestamp, + "X-FinMind-Signature": f"sha256={signature}", + "X-FinMind-Delivery": str(delivery.id), + } + + try: + response = requests.post( + target.url, + data=payload_json, + headers=headers, + timeout=WebhookService.REQUEST_TIMEOUT_SECONDS, + ) + delivery.response_status = response.status_code + delivery.response_body = (response.text or "")[:2000] + + if response.status_code in WebhookService.SUCCESS_STATUS_CODES: + delivery.status = "success" + delivery.next_attempt_at = None + else: + WebhookService._schedule_retry_or_fail(delivery) + + except requests.RequestException as exc: + delivery.response_status = None + delivery.response_body = str(exc)[:2000] + WebhookService._schedule_retry_or_fail(delivery) + + db.session.commit() + + @staticmethod + def redeliver(*, delivery_id: int, user_id: int) -> WebhookDelivery | None: + delivery = ( + WebhookDelivery.query.join( + WebhookTarget, WebhookTarget.id == WebhookDelivery.target_id + ) + .filter(WebhookDelivery.id == delivery_id, WebhookTarget.user_id == user_id) + .first() + ) + if not delivery: + return None + delivery.status = "pending" + delivery.next_attempt_at = datetime.utcnow() + db.session.commit() + WebhookService.process_pending_deliveries(limit=1) + return delivery + + @staticmethod + def calculate_next_attempt(attempt_count: int) -> datetime: + delay_seconds = min(60 * (2 ** max(attempt_count - 1, 0)), 3600) + return datetime.utcnow() + timedelta(seconds=delay_seconds) + + @staticmethod + def _normalize_event_name(event_type: WebhookEvent | str) -> str: + if isinstance(event_type, WebhookEvent): + return event_type.value + event_name = str(event_type) + if event_name not in {item.value for item in WebhookEvent}: + raise ValueError("unsupported event type") + return event_name + + @staticmethod + def _normalize_events(events: list[str] | None) -> list[str]: + all_events = [item.value for item in WebhookEvent] + if events is None: + return all_events + if not isinstance(events, list) or len(events) == 0: + raise ValueError("events must be a non-empty list") + + normalized: list[str] = [] + seen: set[str] = set() + for raw in events: + value = str(raw).strip() + if value == "*": + return all_events + if value not in all_events: + raise ValueError(f"unsupported event type: {value}") + if value in seen: + continue + seen.add(value) + normalized.append(value) + return normalized + + @staticmethod + def _target_subscribed(target: WebhookTarget, event_name: str) -> bool: + events = target.events or [] + if not isinstance(events, list): + return False + return event_name in events or "*" in events + + @staticmethod + def _schedule_retry_or_fail(delivery: WebhookDelivery) -> None: + if delivery.attempt_count <= WebhookService.MAX_RETRIES: + delivery.status = "pending" + delivery.next_attempt_at = WebhookService.calculate_next_attempt( + delivery.attempt_count + ) + else: + delivery.status = "failed" + delivery.next_attempt_at = None + + @staticmethod + def _validate_url(url: str) -> None: + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError("url must be a valid http(s) URL") diff --git a/packages/backend/tests/test_webhooks.py b/packages/backend/tests/test_webhooks.py new file mode 100644 index 0000000..099d5e1 --- /dev/null +++ b/packages/backend/tests/test_webhooks.py @@ -0,0 +1,133 @@ +from datetime import datetime, timedelta + +import requests + +from app.extensions import db +from app.models import WebhookDelivery +from app.services.webhooks import WebhookService + + +EXPECTED_EVENT_TYPES = { + "expense.created", + "expense.updated", + "expense.deleted", + "bill.created", + "bill.updated", + "bill.deleted", + "bill.due", + "subscription.updated", + "profile.updated", +} + + +def _create_target(client, auth_header, events: list[str]): + response = client.post( + "/webhooks/targets", + json={ + "url": "https://example.test/webhook", + "secret": "super-secret-key", + "events": events, + }, + headers=auth_header, + ) + assert response.status_code == 201 + return response.get_json()["id"] + + +def test_webhook_event_types_documented(client, auth_header): + response = client.get("/webhooks/event-types", headers=auth_header) + assert response.status_code == 200 + + event_types = {item["type"] for item in response.get_json()} + assert EXPECTED_EVENT_TYPES.issubset(event_types) + + +def test_signed_webhook_delivery_for_expense_created(client, auth_header, monkeypatch): + _create_target(client, auth_header, ["expense.created"]) + + captured: dict = {} + + class _Response: + status_code = 204 + text = "ok" + + def _fake_post(url, data, headers, timeout): + captured["url"] = url + captured["data"] = data + captured["headers"] = headers + captured["timeout"] = timeout + return _Response() + + monkeypatch.setattr("app.services.webhooks.requests.post", _fake_post) + + response = client.post( + "/expenses", + json={ + "amount": 10.5, + "description": "Coffee", + "date": "2026-03-01", + }, + headers=auth_header, + ) + assert response.status_code == 201 + + assert captured["url"] == "https://example.test/webhook" + assert captured["timeout"] == 10 + assert captured["headers"]["X-FinMind-Event"] == "expense.created" + assert captured["headers"]["X-FinMind-Signature"].startswith("sha256=") + + timestamp = captured["headers"]["X-FinMind-Timestamp"] + expected_signature = WebhookService.generate_signature( + "super-secret-key", + captured["data"], + timestamp, + ) + assert captured["headers"]["X-FinMind-Signature"] == f"sha256={expected_signature}" + + with client.application.app_context(): + deliveries = WebhookDelivery.query.all() + assert len(deliveries) == 1 + assert deliveries[0].status == "success" + assert deliveries[0].attempt_count == 1 + + +def test_webhook_retry_and_failure_handling(client, auth_header, monkeypatch): + _create_target(client, auth_header, ["expense.created"]) + + def _always_fail(*_args, **_kwargs): + raise requests.RequestException("connection failed") + + monkeypatch.setattr("app.services.webhooks.requests.post", _always_fail) + + response = client.post( + "/expenses", + json={ + "amount": 20, + "description": "Lunch", + "date": "2026-03-02", + }, + headers=auth_header, + ) + assert response.status_code == 201 + + with client.application.app_context(): + delivery = WebhookDelivery.query.one() + assert delivery.status == "pending" + assert delivery.attempt_count == 1 + assert delivery.next_attempt_at is not None + assert "connection failed" in (delivery.response_body or "") + + delivery.next_attempt_at = datetime.utcnow() - timedelta(seconds=1) + delivery.attempt_count = WebhookService.MAX_RETRIES + db.session.commit() + + delivery_id = delivery.id + + with client.application.app_context(): + WebhookService.process_pending_deliveries() + + with client.application.app_context(): + failed_delivery = db.session.get(WebhookDelivery, delivery_id) + assert failed_delivery is not None + assert failed_delivery.status == "failed" + assert failed_delivery.attempt_count == WebhookService.MAX_RETRIES + 1