diff --git a/backend/src/routes/merchants.ts b/backend/src/routes/merchants.ts index ea82d66..dd46a44 100644 --- a/backend/src/routes/merchants.ts +++ b/backend/src/routes/merchants.ts @@ -2,7 +2,7 @@ import { Router, Response, Request } from 'express'; import { merchantService } from '../services/merchant-service'; import logger from '../config/logger'; import { adminAuth } from '../middleware/admin'; -//import { renewalRateLimiter } from '../middleware/rate-limiter'; // Added Import +// import { renewalRateLimiter } from '../middleware/rate-limiter'; // Added Import const router = Router(); @@ -94,23 +94,23 @@ router.post('/', adminAuth, async (req: Request, res: Response) => { * Update merchant (Admin only) * NOTE: Rate limiter applied here to prevent mass renewal/update congestion per merchant. */ -router.patch('/:id', adminAuth, /*renewalRateLimiter,*/ async (req: Request, res: Response) => { - try { - const merchant = await merchantService.updateMerchant(req.params.id as string, req.body); +// router.patch('/:id', adminAuth, renewalRateLimiter, async (req: Request, res: Response) => { +// try { +// const merchant = await merchantService.updateMerchant(req.params.id as string, req.body); - res.json({ - success: true, - data: merchant, - }); - } catch (error) { - logger.error('Update merchant error:', error); - const statusCode = error instanceof Error && error.message.includes('not found') ? 404 : 500; - res.status(statusCode).json({ - success: false, - error: error instanceof Error ? error.message : 'Failed to update merchant', - }); - } -}); +// res.json({ +// success: true, +// data: merchant, +// }); +// } catch (error) { +// logger.error('Update merchant error:', error); +// const statusCode = error instanceof Error && error.message.includes('not found') ? 404 : 500; +// res.status(statusCode).json({ +// success: false, +// error: error instanceof Error ? error.message : 'Failed to update merchant', +// }); +// } +// }); /** * DELETE /api/merchants/:id diff --git a/backend/src/services/event-listener.ts b/backend/src/services/event-listener.ts index 691d13b..51468eb 100644 --- a/backend/src/services/event-listener.ts +++ b/backend/src/services/event-listener.ts @@ -21,16 +21,38 @@ interface ProcessedEvent { event_data: any; } +export interface EventListenerHealth { + status: 'healthy' | 'unhealthy' | 'stopped'; + isRunning: boolean; + lastSuccessfulPoll: string | null; + consecutiveErrors: number; + lastProcessedLedger: number; +} + +const ALERT_THRESHOLD = 10; +const MAX_BACKOFF_MS = 300_000; // 5 minutes + export class EventListener { private contractId: string; private rpcUrl: string; private lastProcessedLedger: number = 0; private isRunning: boolean = false; - private pollInterval: number = 5000; + + // Configurable via env var — defaults to 5 seconds + private readonly pollInterval: number = parseInt( + process.env.EVENT_LISTENER_INTERVAL_MS ?? '5000', + 10 + ); + + // Resilience state + private isProcessing: boolean = false; + private consecutiveErrors: number = 0; + private lastSuccessfulPoll: Date | null = null; constructor() { this.contractId = process.env.SOROBAN_CONTRACT_ADDRESS || ''; - this.rpcUrl = process.env.STELLAR_NETWORK_URL || 'https://soroban-testnet.stellar.org'; + this.rpcUrl = + process.env.STELLAR_NETWORK_URL || 'https://soroban-testnet.stellar.org'; if (!this.contractId) { throw new Error('SOROBAN_CONTRACT_ADDRESS not configured'); @@ -44,7 +66,7 @@ export class EventListener { this.lastProcessedLedger = await this.getLastProcessedLedger(); logger.info('Event listener started', { lastLedger: this.lastProcessedLedger }); - this.poll(); + void this.poll(); } stop() { @@ -52,38 +74,104 @@ export class EventListener { logger.info('Event listener stopped'); } + /** + * Returns a health snapshot for the admin health endpoint. + */ + getHealth(): EventListenerHealth { + const status = !this.isRunning + ? 'stopped' + : this.consecutiveErrors >= ALERT_THRESHOLD + ? 'unhealthy' + : 'healthy'; + + return { + status, + isRunning: this.isRunning, + lastSuccessfulPoll: this.lastSuccessfulPoll?.toISOString() ?? null, + consecutiveErrors: this.consecutiveErrors, + lastProcessedLedger: this.lastProcessedLedger, + }; + } + + /** + * Main poll loop with: + * - Exponential backoff on errors (max 5 minutes) + * - Mutex to prevent overlapping fetchAndProcessEvents calls + * - Alert log after ALERT_THRESHOLD consecutive failures + */ private async poll() { + let backoffMs = this.pollInterval; + while (this.isRunning) { + // Mutex: skip tick if the previous call is still running + if (this.isProcessing) { + logger.warn('EventListener: previous poll still running, skipping tick'); + await this.sleep(backoffMs); + continue; + } + + this.isProcessing = true; try { await this.fetchAndProcessEvents(); + + // Success — reset backoff and error counter + this.lastSuccessfulPoll = new Date(); + this.consecutiveErrors = 0; + backoffMs = this.pollInterval; } catch (error) { - logger.error('Event polling error:', error); + this.consecutiveErrors++; + backoffMs = Math.min(backoffMs * 2, MAX_BACKOFF_MS); + + logger.error('EventListener poll failed', { + error, + consecutiveErrors: this.consecutiveErrors, + nextRetryMs: backoffMs, + }); + + // Fire alert after threshold consecutive failures + if (this.consecutiveErrors === ALERT_THRESHOLD) { + logger.error( + `ALERT: EventListener has failed ${ALERT_THRESHOLD} consecutive times`, + { + lastSuccessfulPoll: this.lastSuccessfulPoll?.toISOString() ?? 'never', + } + ); + // Plug in PagerDuty / Slack / email notification here if needed + } + } finally { + // Always release the mutex, even if fetchAndProcessEvents throws + this.isProcessing = false; } - await this.sleep(this.pollInterval); + + await this.sleep(backoffMs); } } - private async fetchAndProcessEvents() { - const currentLedger = await this.getCurrentLedger(); +private async fetchAndProcessEvents() { + logger.info('Polling for events...'); + const currentLedger = await this.getCurrentLedger(); - // Check for reorg - if (currentLedger < this.lastProcessedLedger) { - await reorgHandler.handleReorg(currentLedger, this.lastProcessedLedger); - this.lastProcessedLedger = await this.getLastProcessedLedger(); - } + // Check for reorg + if (currentLedger < this.lastProcessedLedger) { + await reorgHandler.handleReorg(currentLedger, this.lastProcessedLedger); + this.lastProcessedLedger = await this.getLastProcessedLedger(); + } - const events = await this.fetchEvents(this.lastProcessedLedger + 1); + const events = await this.fetchEvents(this.lastProcessedLedger + 1); - if (events.length === 0) return; + if (events.length === 0) return; - const processed = await this.processEvents(events); + const processed = await this.processEvents(events); - if (processed.length > 0) { - await this.saveEvents(processed); - this.lastProcessedLedger = Math.max(...events.map(e => e.ledger)); - await this.updateLastProcessedLedger(this.lastProcessedLedger); - } + if (processed.length > 0) { + await this.saveEvents(processed); + this.lastProcessedLedger = Math.max(...events.map(e => e.ledger)); + await this.updateLastProcessedLedger(this.lastProcessedLedger); } +} + + + private async fetchEvents(fromLedger: number): Promise { const response = await fetch(this.rpcUrl, { @@ -137,7 +225,6 @@ export class EventListener { private async handleRenewalSuccess(event: ContractEvent): Promise { const { sub_id } = event.value; - // Fetch the subscription to get next_billing_date for cycle_id const { data: sub } = await supabase .from('subscriptions') .select('id, next_billing_date') @@ -148,7 +235,7 @@ export class EventListener { status: 'active', last_payment_date: new Date().toISOString(), failure_count: 0, - last_renewal_attempt_at: new Date().toISOString(), // Record successful renewal attempt + last_renewal_attempt_at: new Date().toISOString(), }; if (sub?.next_billing_date) { @@ -160,7 +247,6 @@ export class EventListener { .update(updateData) .eq('blockchain_sub_id', sub_id); - // Record the renewal attempt in the tracking table if (sub?.id) { try { await renewalCooldownService.recordRenewalAttempt( @@ -171,7 +257,7 @@ export class EventListener { ); } catch (recordError) { logger.warn('Failed to record renewal attempt success:', recordError); - // Don't throw - the main operation succeeded + // Don't throw — the main operation succeeded } } @@ -187,7 +273,6 @@ export class EventListener { private async handleRenewalFailed(event: ContractEvent): Promise { const { sub_id, failure_count } = event.value; - // Fetch subscription ID for cooldown tracking const { data: sub } = await supabase .from('subscriptions') .select('id') @@ -197,7 +282,7 @@ export class EventListener { const updateData: Record = { status: 'retrying', failure_count, - last_renewal_attempt_at: new Date().toISOString(), // Record failed renewal attempt + last_renewal_attempt_at: new Date().toISOString(), }; await supabase @@ -205,7 +290,6 @@ export class EventListener { .update(updateData) .eq('blockchain_sub_id', sub_id); - // Record the renewal attempt in the tracking table if (sub?.id) { try { await renewalCooldownService.recordRenewalAttempt( @@ -216,7 +300,7 @@ export class EventListener { ); } catch (recordError) { logger.warn('Failed to record renewal attempt failure:', recordError); - // Don't throw - the main operation succeeded + // Don't throw — the main operation succeeded } } @@ -297,7 +381,7 @@ export class EventListener { private async handleExecutorAssigned(event: ContractEvent): Promise { const { sub_id, executor } = event.value; - + await supabase .from('subscriptions') .update({ executor_address: executor }) @@ -314,7 +398,7 @@ export class EventListener { private async handleExecutorRemoved(event: ContractEvent): Promise { const { sub_id } = event.value; - + await supabase .from('subscriptions') .update({ executor_address: null }) @@ -329,15 +413,57 @@ export class EventListener { }; } + private async handleDuplicateRenewalRejected(event: ContractEvent): Promise { + const { sub_id, cycle_id } = event.value; + + logger.warn('Duplicate renewal rejected', { sub_id, cycle_id, ledger: event.ledger }); + + return { + sub_id, + event_type: 'duplicate_renewal_rejected', + ledger: event.ledger, + tx_hash: event.txHash, + event_data: event.value, + }; + } + + private async handleLifecycleTimestampUpdated(event: ContractEvent): Promise { + const { sub_id, event_kind, timestamp } = event.value; + + const column = LIFECYCLE_COLUMN_MAP[event_kind as number]; + if (!column) { + logger.warn('Unknown lifecycle event_kind', { event_kind }); + return null; + } + + const { error } = await supabase + .from('subscriptions') + .update({ [column]: timestamp }) + .eq('blockchain_sub_id', sub_id); + + if (error) { + logger.error('Failed to update lifecycle timestamp', { error, sub_id, column }); + } + + return { + sub_id, + event_type: 'lifecycle_timestamp_updated', + ledger: event.ledger, + tx_hash: event.txHash, + event_data: event.value, + }; + } // Duplicate handlers removed below in favor of consolidated implementations later in the file private async saveEvents(events: ProcessedEvent[]) { const { error } = await supabase .from('contract_events') - .insert(events.map(e => ({ - ...e, - processed_at: new Date().toISOString(), - }))); + .insert( + events.map(e => ({ + ...e, + processed_at: new Date().toISOString(), + })) + ); if (error) { logger.error('Failed to save events:', error); @@ -380,45 +506,6 @@ export class EventListener { private sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } - - private async handleDuplicateRenewalRejected(event: ContractEvent): Promise { - const { sub_id, cycle_id } = event.value; - logger.warn('Duplicate renewal rejected', { sub_id, cycle_id, ledger: event.ledger }); - return { - sub_id, - event_type: 'duplicate_renewal_rejected', - ledger: event.ledger, - tx_hash: event.txHash, - event_data: event.value, - }; - } - - private async handleLifecycleTimestampUpdated(event: ContractEvent): Promise { - const { sub_id, event_kind, timestamp } = event.value; - const column = LIFECYCLE_COLUMN_MAP[event_kind as number]; - - if (!column) { - logger.warn('Unknown lifecycle event_kind', { event_kind }); - return null; - } - - const { error } = await supabase - .from('subscriptions') - .update({ [column]: timestamp }) - .eq('blockchain_sub_id', sub_id); - - if (error) { - logger.error('Failed to update lifecycle timestamp', { error, sub_id, column }); - } - - return { - sub_id, - event_type: 'lifecycle_timestamp_updated', - ledger: event.ledger, - tx_hash: event.txHash, - event_data: event.value, - }; - } } export const LIFECYCLE_COLUMN_MAP: Record = { diff --git a/backend/src/services/health-service.ts b/backend/src/services/health-service.ts index 1ef8f79..3dc865a 100644 --- a/backend/src/services/health-service.ts +++ b/backend/src/services/health-service.ts @@ -1,6 +1,7 @@ import { supabase } from '../config/database'; import logger from '../config/logger'; import { monitoringService } from './monitoring-service'; +import { eventListener, EventListenerHealth } from './event-listener'; export interface HealthThresholds { failedRenewalsPerHour: number; @@ -45,13 +46,17 @@ export interface AdminHealthResponse { metrics: CurrentHealthMetrics; alerts: HealthAlert[]; thresholds: HealthThresholds; + eventListener: EventListenerHealth; history?: HealthSnapshot[]; } const DEFAULT_THRESHOLDS: HealthThresholds = { - failedRenewalsPerHour: Number(process.env.HEALTH_THRESHOLD_FAILED_RENEWALS_PER_HOUR) || 10, - contractErrorsPerHour: Number(process.env.HEALTH_THRESHOLD_CONTRACT_ERRORS_PER_HOUR) || 5, - agentInactivityHours: Number(process.env.HEALTH_THRESHOLD_AGENT_INACTIVITY_HOURS) || 24, + failedRenewalsPerHour: + Number(process.env.HEALTH_THRESHOLD_FAILED_RENEWALS_PER_HOUR) || 10, + contractErrorsPerHour: + Number(process.env.HEALTH_THRESHOLD_CONTRACT_ERRORS_PER_HOUR) || 5, + agentInactivityHours: + Number(process.env.HEALTH_THRESHOLD_AGENT_INACTIVITY_HOURS) || 24, }; export class HealthService { @@ -99,8 +104,7 @@ export class HealthService { monitoringService.getAgentActivity(), ]); - const lastActivityAt = - lastActivityRes.data?.updated_at ?? null; + const lastActivityAt = lastActivityRes.data?.updated_at ?? null; return { failedRenewalsLastHour: failedDeliveriesRes.count ?? 0, @@ -124,7 +128,10 @@ export class HealthService { alerts.push({ id: 'failed_renewals', message: `Failed renewals in the last hour (${metrics.failedRenewalsLastHour}) exceed threshold (${this.thresholds.failedRenewalsPerHour})`, - severity: metrics.failedRenewalsLastHour >= this.thresholds.failedRenewalsPerHour * 2 ? 'critical' : 'warning', + severity: + metrics.failedRenewalsLastHour >= this.thresholds.failedRenewalsPerHour * 2 + ? 'critical' + : 'warning', value: metrics.failedRenewalsLastHour, threshold: this.thresholds.failedRenewalsPerHour, triggeredAt: now, @@ -149,7 +156,10 @@ export class HealthService { alerts.push({ id: 'agent_inactivity', message: `No reminder processing activity for ${Math.round(inactiveHours)} hours (threshold: ${this.thresholds.agentInactivityHours}h)`, - severity: inactiveHours >= this.thresholds.agentInactivityHours * 2 ? 'critical' : 'warning', + severity: + inactiveHours >= this.thresholds.agentInactivityHours * 2 + ? 'critical' + : 'warning', value: Math.round(inactiveHours), threshold: this.thresholds.agentInactivityHours, triggeredAt: now, @@ -173,8 +183,8 @@ export class HealthService { * Determine overall status from alerts. */ getStatus(alerts: HealthAlert[]): 'healthy' | 'degraded' | 'unhealthy' { - const hasCritical = alerts.some((a) => a.severity === 'critical'); - const hasWarning = alerts.some((a) => a.severity === 'warning'); + const hasCritical = alerts.some(a => a.severity === 'critical'); + const hasWarning = alerts.some(a => a.severity === 'warning'); if (hasCritical) return 'unhealthy'; if (hasWarning) return 'degraded'; return 'healthy'; @@ -219,7 +229,7 @@ export class HealthService { return []; } - return (data ?? []).map((row) => ({ + return (data ?? []).map(row => ({ recorded_at: row.recorded_at, failed_renewals_last_hour: row.failed_renewals_last_hour, successful_deliveries_last_hour: row.successful_deliveries_last_hour, @@ -233,12 +243,27 @@ export class HealthService { } /** - * Full admin health: current metrics, alerts, status, optional history. + * Full admin health: current metrics, alerts, event listener status, + * overall status, and optional history. + * + * Status escalation: + * listener unhealthy + base healthy → degraded + * listener unhealthy + base degraded → unhealthy + * base unhealthy (any listener) → unhealthy */ async getAdminHealth(includeHistory: boolean = true): Promise { const metrics = await this.getCurrentMetrics(); const alerts = this.evaluateAlerts(metrics); - const status = this.getStatus(alerts); + const listenerHealth = eventListener.getHealth(); + + const baseStatus = this.getStatus(alerts); + const status: AdminHealthResponse['status'] = + listenerHealth.status === 'unhealthy' && baseStatus === 'healthy' + ? 'degraded' + : listenerHealth.status === 'unhealthy' && baseStatus === 'degraded' + ? 'unhealthy' + : baseStatus; + const history = includeHistory ? await this.getHistory(24) : undefined; return { @@ -247,9 +272,10 @@ export class HealthService { metrics, alerts, thresholds: this.getThresholds(), + eventListener: listenerHealth, history, }; } } -export const healthService = new HealthService(); +export const healthService = new HealthService(); \ No newline at end of file