From 1e1fd3233360abf63a0a827917ecb55c166ff294 Mon Sep 17 00:00:00 2001 From: recscse Date: Thu, 9 Apr 2026 09:00:04 +0530 Subject: [PATCH] loging changes --- app.py | 321 +++++++++++++++++++++---------------- core/production_logging.py | 216 ++++++++++++++++--------- utils/logging_utils.py | 4 +- 3 files changed, 327 insertions(+), 214 deletions(-) diff --git a/app.py b/app.py index 3dea51a..7c6342d 100644 --- a/app.py +++ b/app.py @@ -47,12 +47,15 @@ def _windows_exception_handler(loop, context): # Load environment variables FIRST load_dotenv() -# Configure logging EARLY -# logging.basicConfig( -# level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -# ) +# Configure PRODUCTION logging EARLY +# This silences noisy libraries (TF, Torch, etc.), enables structured JSON in prod +# and applies ThrottledFilter to stop "Log Storms" from market ticks. +from core.production_logging import get_trading_logger +get_trading_logger() + logger = logging.getLogger(__name__) + # Custom Filter to suppress noisy Uvicorn access logs (Polling & OPTIONS) class EndpointFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: @@ -61,7 +64,7 @@ def filter(self, record: logging.LogRecord) -> bool: if record.args and len(record.args) >= 3: req_line = str(record.args[1]) status_code = record.args[2] - + # Filter out successful polling and OPTIONS requests if status_code == 200: if "GET /api/notifications" in req_line: @@ -71,6 +74,8 @@ def filter(self, record: logging.LogRecord) -> bool: return True except Exception: return True + + try: from services.centralized_ws_manager import centralized_manager from router.market_ws import ( @@ -80,8 +85,8 @@ def filter(self, record: logging.LogRecord) -> bool: CENTRALIZED_WS_AVAILABLE = True CENTRALIZED_ROUTES_AVAILABLE = True - logger.info("✅ NEW: Centralized WebSocket manager imported successfully") - logger.info("✅ NEW: Centralized WebSocket routes imported successfully") + logger.debug("✅ NEW: Centralized WebSocket manager imported successfully") + logger.debug("✅ NEW: Centralized WebSocket routes imported successfully") except ImportError as e: CENTRALIZED_WS_AVAILABLE = False CENTRALIZED_ROUTES_AVAILABLE = False @@ -96,7 +101,7 @@ def filter(self, record: logging.LogRecord) -> bool: if CENTRALIZED_WS_AVAILABLE: legacy_market_ws_router = centralized_market_ws_router LEGACY_MARKET_WS_AVAILABLE = True - logger.info( + logger.debug( "✅ Legacy market WebSocket routes available for backward compatibility" ) else: @@ -121,7 +126,7 @@ def filter(self, record: logging.LogRecord) -> bool: from router.market_analytics_router import router as market_analytics_router MARKET_ANALYTICS_ROUTER_AVAILABLE = True - logger.info("✅ Market analytics router imported successfully") + logger.debug("✅ Market analytics router imported successfully") except ImportError as e: logger.warning(f"⚠️ Market analytics router not available: {e}") from fastapi import APIRouter @@ -154,10 +159,12 @@ def filter(self, record: logging.LogRecord) -> bool: try: from router.upstox_order_router import router as upstox_order_router + UPSTOX_ORDER_ROUTER_AVAILABLE = True - logger.info("✅ Upstox Order Management router imported successfully") + logger.debug("✅ Upstox Order Management router imported successfully") except ImportError as e: from fastapi import APIRouter + upstox_order_router = APIRouter() UPSTOX_ORDER_ROUTER_AVAILABLE = False logger.warning(f"⚠️ Upstox Order Management router not available: {e}") @@ -181,6 +188,7 @@ def filter(self, record: logging.LogRecord) -> bool: from router.notification_router import router as notification_router from router.instrument_routes import router as instrument_router + # from services.instrument_registry import instrument_registry from router.websocket_routes import router as websocket_router from router.debug_routes import router as debug_router @@ -189,13 +197,20 @@ def filter(self, record: logging.LogRecord) -> bool: # from router.unified_websocket_routes import router as unified_ws_router from router.paper_trading_routes import router as paper_trading_router from router.option_routes import option_router -from router.system_health_router import router as system_health_router +try: + from router.system_health_router import router as system_health_router + SYSTEM_HEALTH_AVAILABLE = True +except (ImportError, ModuleNotFoundError) as e: + from fastapi import APIRouter + system_health_router = APIRouter() + SYSTEM_HEALTH_AVAILABLE = False + logger.warning(f"⚠️ System Health router not available: {e}") try: from router.trading_execution_router import router as trading_execution_router TRADING_EXECUTION_AVAILABLE = True - logger.info("✅ Trading Execution routes imported successfully") + logger.debug("✅ Trading Execution routes imported successfully") except ImportError as e: from fastapi import APIRouter @@ -218,7 +233,7 @@ def filter(self, record: logging.LogRecord) -> bool: ) GAP_DETECTION_AVAILABLE = True - logger.info("✅ Gap detection service imported successfully") + logger.debug("✅ Gap detection service imported successfully") except ImportError as e: GAP_DETECTION_AVAILABLE = False logger.warning(f"⚠️ Gap detection service not available: {e}") @@ -246,7 +261,7 @@ def _initialize_redis(self): redis_enabled = os.getenv("REDIS_ENABLED", "true").lower() == "true" if not redis_enabled: - logger.info("🔧 Redis disabled via REDIS_ENABLED environment variable") + logger.debug("🔧 Redis disabled via REDIS_ENABLED environment variable") return try: @@ -264,7 +279,7 @@ def _initialize_redis(self): # Test connection self.redis_client.ping() self.is_connected = True - logger.info("✅ Redis connected successfully for trading system") + logger.debug("✅ Redis connected successfully for trading system") except ConnectionError: logger.warning( @@ -461,41 +476,46 @@ async def lifespan(app: FastAPI): """Enhanced lifespan with NEW centralized WebSocket system integration""" global trading_engine, market_scheduler, instrument_service_instance - logger.info( + logger.debug( "🚀 Starting Enhanced Trading Application with NEW Centralized WebSocket System..." ) - log_structured(event="APP_STARTUP_START", message="Starting Enhanced Trading Application") + log_structured( + event="APP_STARTUP_START", message="Starting Enhanced Trading Application" + ) # Initialize execution handlers with event loop for thread-safe dispatch try: loop = asyncio.get_running_loop() from services.trading_execution.execution_handler import execution_handler from services.trading_execution.trade_prep import trade_prep_service - + execution_handler.initialize(loop) trade_prep_service.initialize(loop) - logger.info("✅ Execution handlers initialized with event loop for thread-safe dispatch.") + logger.debug( + "✅ Execution handlers initialized with event loop for thread-safe dispatch." + ) except Exception as e: logger.error(f"❌ Failed to initialize execution handlers with loop: {e}") try: # 1. DB initialization db = next(get_db()) - logger.info("✅ DB session initialized.") + logger.debug("✅ DB session initialized.") db.close() # 2. Redis health check redis_status = trading_redis.health_check() - logger.info(f"🔧 Redis status: {redis_status['message']}") + logger.debug(f"🔧 Redis status: {redis_status['message']}") + # 🚀 BACKGROUND INITIALIZATION: Move heavy tasks to background to prevent startup blocking async def initialize_services_background(): """Initialize heavy services in background""" global instrument_service_instance - + try: # 3. Initialize instrument service FIRST - logger.info("🔧 Initializing Instrument Service in background...") + logger.debug("🔧 Initializing Instrument Service in background...") from services.instrument_refresh_service import get_trading_service instrument_service = get_trading_service() @@ -503,7 +523,7 @@ async def initialize_services_background(): initialization_result = await instrument_service.initialize_service() if initialization_result.status == "success": - logger.info( + logger.debug( f"✅ Instrument service initialized with {initialization_result.mapped_stocks} stocks" ) instrument_service_instance = instrument_service @@ -511,27 +531,41 @@ async def initialize_services_background(): logger.error( f"❌ Instrument service initialization failed: {initialization_result.error}" ) - log_structured(event="APP_STARTUP_ERROR", level="ERROR", message=f"Instrument service init failed: {initialization_result.error}") + log_structured( + event="APP_STARTUP_ERROR", + level="ERROR", + message=f"Instrument service init failed: {initialization_result.error}", + ) # 4. Initialize optimized market data service try: - logger.info("🚀 Initializing Optimized Market Data Service...") - from services.optimized_market_data_service import optimized_market_service + logger.debug("🚀 Initializing Optimized Market Data Service...") + from services.optimized_market_data_service import ( + optimized_market_service, + ) await optimized_market_service.initialize_instruments() stats = optimized_market_service.get_stats() - logger.info( + logger.debug( f"✅ Optimized Market Service initialized with {stats['total_instruments']} instruments, {stats['active_instruments']} active" ) except Exception as e: - logger.error(f"❌ Optimized market service initialization failed: {e}") - log_structured(event="APP_STARTUP_ERROR", level="ERROR", message=f"Optimized market service init failed: {str(e)}") + logger.error( + f"❌ Optimized market service initialization failed: {e}" + ) + log_structured( + event="APP_STARTUP_ERROR", + level="ERROR", + message=f"Optimized market service init failed: {str(e)}", + ) # 4.5. Initialize Real-Time Market Analytics Engine try: - logger.info("📊 Initializing Real-Time Market Analytics Engine...") + logger.debug("📊 Initializing Real-Time Market Analytics Engine...") from services.realtime_market_engine import initialize_market_engine - from services.instrument_refresh_service import get_analytics_metadata + from services.instrument_refresh_service import ( + get_analytics_metadata, + ) # Get analytics metadata from instrument service analytics_metadata = get_analytics_metadata() @@ -545,11 +579,11 @@ async def initialize_services_background(): engine = get_market_engine() stats = engine.get_stats() - logger.info( + logger.debug( f"✅ Real-Time Analytics Engine initialized with {stats['total_instruments']} instruments " f"across {stats['sectors']} sectors" ) - logger.info( + logger.debug( f"📈 Analytics latency: {stats['analytics_latency_ms']:.2f}ms" ) else: @@ -558,22 +592,33 @@ async def initialize_services_background(): ) except Exception as e: - logger.error(f"❌ Real-Time Analytics Engine initialization failed: {e}") + logger.error( + f"❌ Real-Time Analytics Engine initialization failed: {e}" + ) import traceback + logger.error(f"❌ Traceback: {traceback.format_exc()}") - log_structured(event="APP_STARTUP_ERROR", level="ERROR", message=f"Analytics engine init failed: {str(e)}") - + log_structured( + event="APP_STARTUP_ERROR", + level="ERROR", + message=f"Analytics engine init failed: {str(e)}", + ) + except Exception as e: logger.error(f"❌ Background service initialization failed: {e}") - log_structured(event="APP_STARTUP_CRITICAL_FAILURE", level="CRITICAL", message=f"Background init failed: {str(e)}") + log_structured( + event="APP_STARTUP_CRITICAL_FAILURE", + level="CRITICAL", + message=f"Background init failed: {str(e)}", + ) # Start the background initialization task asyncio.create_task(initialize_services_background()) - logger.info("✅ Core services initialization started in background") + logger.debug("✅ Core services initialization started in background") # 4.6. 🚀 NEW: Initialize Enhanced Breakout Detection Engine try: - logger.info("🎯 Initializing Enhanced Breakout Detection Engine...") + logger.debug("🎯 Initializing Enhanced Breakout Detection Engine...") from services.enhanced_breakout_engine import ( initialize_breakout_engine, connect_to_market_engine, @@ -608,14 +653,14 @@ async def initialize_services_background(): # Connect to realtime market engine for live data feed if connect_to_market_engine(): stats = get_breakout_stats() - logger.info( + logger.debug( f"✅ Enhanced Breakout Engine initialized and connected to market data" ) - logger.info( + logger.debug( f"📊 Breakout Detection: Min Volume={stats['min_volume']}, " f"Min Price={stats['min_price']}, Momentum Threshold={stats['momentum_threshold']*100}%" ) - logger.info("🔥 Real-time breakout/breakdown detection is now active!") + logger.debug("🔥 Real-time breakout/breakdown detection is now active!") else: logger.warning( "⚠️ Breakout engine initialized but not connected to market data" @@ -633,11 +678,11 @@ async def initialize_services_background(): # 7c. Start Gap Detection Service if GAP_DETECTION_AVAILABLE: - logger.info("Starting Gap Detection Service...") + logger.debug("Starting Gap Detection Service...") try: # Start as background task to avoid blocking startup asyncio.create_task(start_gap_detection_scheduler()) - logger.info( + logger.debug( "Gap Detection Service started - scheduled for 9:08 AM IST daily" ) except Exception as e: @@ -645,82 +690,84 @@ async def initialize_services_background(): # 8. NEW: Initialize Centralized WebSocket System (non-blocking) if CENTRALIZED_WS_AVAILABLE: - logger.info("🔌 Initializing NEW Centralized WebSocket System...") + logger.debug("🔌 Initializing NEW Centralized WebSocket System...") try: # CRITICAL FIX: DON'T await - just create background task # start_connection() will handle initialization internally - logger.info( + logger.debug( "🔌 Starting Centralized WebSocket connection in background..." ) asyncio.create_task(centralized_manager.start_connection()) - logger.info( + logger.debug( "✅ Centralized WebSocket background task started - continuing with other services" ) # 🚀 ZERO-DELAY real-time streaming will be activated on-demand # when a client connects to the /api/v1/realtime/stream WebSocket - logger.info("🚀 ZERO-DELAY real-time streaming available (on-demand)") + logger.debug("🚀 ZERO-DELAY real-time streaming available (on-demand)") # Log initial status (don't await health check to avoid blocking) - logger.info( + logger.debug( "📊 Centralized WebSocket Status: Starting in background..." ) - logger.info("📊 WebSocket will connect automatically when ready") + logger.debug("📊 WebSocket will connect automatically when ready") # 🚀 ENHANCED: Connect centralized manager to unified WebSocket manager with Real-Time Analytics - logger.info( + logger.debug( "🔗 Connecting centralized WebSocket manager to enhanced unified system..." ) except Exception as e: logger.error(f"❌ NEW: Centralized WebSocket system error: {e}") logger.warning("⚠️ Application will continue without live market data") - log_structured(event="APP_STARTUP_ERROR", level="ERROR", message=f"Centralized WS system error: {str(e)}") + log_structured( + event="APP_STARTUP_ERROR", + level="ERROR", + message=f"Centralized WS system error: {str(e)}", + ) else: logger.warning( "⚠️ NEW: Centralized WebSocket system not available - using legacy only" ) # 7.1. Initialize Upstox Token Automation (IN BACKGROUND - NON-BLOCKING) - logger.info("🔄 Queueing Upstox Token Automation startup...") + logger.debug("🔄 Queueing Upstox Token Automation startup...") try: async def start_upstox_in_background(): """Start Upstox automation in background to avoid blocking startup""" try: # Reduced wait time to ensure it starts reliably and visibly - logger.info("⏳ Waiting 5s for server startup before initializing Upstox automation...") + logger.debug( + "⏳ Waiting 5s for server startup before initializing Upstox automation..." + ) await asyncio.sleep(5) - - logger.info("🚀 Triggering Upstox Automation Service start...") + + logger.debug("🚀 Triggering Upstox Automation Service start...") from services.upstox_automation_service import ( start_upstox_automation, ) upstox_automation = start_upstox_automation() if upstox_automation: - logger.info( + logger.debug( "✅ Upstox token automation started successfully - Scheduler Active" ) else: - logger.warning("⚠️ Upstox token automation failed to start (returned None)") + logger.warning( + "⚠️ Upstox token automation failed to start (returned None)" + ) except Exception as e: - logger.warning( - f"⚠️ Upstox automation background task error: {e}" - ) + logger.warning(f"⚠️ Upstox automation background task error: {e}") # Start in background - DON'T WAIT FOR IT! asyncio.create_task(start_upstox_in_background()) - logger.info( - "✅ Upstox automation queued in background" - ) + logger.debug("✅ Upstox automation queued in background") except Exception as e: - logger.warning( - f"⚠️ Upstox automation task creation failed: {e}" - ) + logger.warning(f"⚠️ Upstox automation task creation failed: {e}") # 7.2. Initialize MarketScheduleService - CRITICAL for FNO and Instrument automation - logger.info("📅 Starting MarketScheduleService...") + logger.debug("📅 Starting MarketScheduleService...") try: from services.market_schedule_service import get_market_scheduler @@ -729,7 +776,7 @@ async def start_upstox_in_background(): market_scheduler_task = asyncio.create_task( market_scheduler.start_daily_scheduler() ) - logger.info( + logger.debug( "✅ MarketScheduleService started - will handle daily FNO refresh, instrument updates, and market timing coordination" ) except Exception as e: @@ -741,12 +788,12 @@ async def start_upstox_in_background(): ) # 7.3. Initialize Notification Scheduler - NEW comprehensive notification system - logger.info("📨 Starting Notification Scheduler...") + logger.debug("📨 Starting Notification Scheduler...") try: from services.notification_scheduler import notification_scheduler notification_scheduler.start_scheduler() - logger.info( + logger.debug( "✅ Notification Scheduler started - handling token expiry, daily summaries, and system alerts" ) except Exception as e: @@ -758,23 +805,23 @@ async def start_upstox_in_background(): ) # Now using enhanced services started earlier in the startup process - logger.info( + logger.debug( "🎯 Using enhanced gap and breakout detection services (numpy/pandas optimized)" ) # 13. ✅ NEW: Start Auto Trading WebSocket Service # TODO Handled this later to start when it needed if True: - logger.info("🔴 Starting Auto Trading WebSocket Service...") + logger.debug("🔴 Starting Auto Trading WebSocket Service...") try: - logger.info("✅ Auto Trading WebSocket Service started") + logger.debug("✅ Auto Trading WebSocket Service started") except Exception as e: logger.error(f"❌ Auto Trading WebSocket Service failed to start: {e}") # 15.6. 🚀 NEW: Start Auto-Trade Scheduler for automatic WebSocket management if TRADING_EXECUTION_AVAILABLE: - logger.info("⏰ Starting Auto-Trade Scheduler...") + logger.debug("⏰ Starting Auto-Trade Scheduler...") try: from services.trading_execution.auto_trade_scheduler import ( auto_trade_scheduler, @@ -788,11 +835,11 @@ async def start_upstox_in_background(): trading_mode=TradingMode.PAPER # Default to paper trading for safety ) ) - logger.info("✅ Auto-Trade Scheduler started (multi-user mode)") - logger.info( + logger.debug("✅ Auto-Trade Scheduler started (multi-user mode)") + logger.debug( "📌 Auto-trading will start at 9:15 AM when ANY user has stocks selected" ) - logger.info( + logger.debug( "📌 Monitoring ALL users with active broker configs automatically" ) @@ -800,7 +847,7 @@ async def start_upstox_in_background(): logger.error(f"❌ Auto-Trade Scheduler failed to start: {e}") # 16. 🚀 NEW: Initialize Intelligent Stock Selection Service - logger.info("🧠 Initializing Intelligent Stock Selection Service...") + logger.debug("🧠 Initializing Intelligent Stock Selection Service...") try: from services.intelligent_stock_selection_service import ( intelligent_stock_selector, @@ -808,7 +855,7 @@ async def start_upstox_in_background(): # CRITICAL FIX: Don't await - run in background to prevent blocking asyncio.create_task(intelligent_stock_selector.initialize_services()) - logger.info( + logger.debug( "✅ Intelligent Stock Selection Service initialization started in background" ) except ImportError as e: @@ -819,69 +866,76 @@ async def start_upstox_in_background(): ) # 17. 🚀 NEW: Initialize MCX WebSocket Service - logger.info("📊 Initializing MCX WebSocket Service...") + logger.debug("📊 Initializing MCX WebSocket Service...") try: from services.websocket.mcx.integration import initialize_mcx_service # CRITICAL FIX: Don't await - run in background to prevent blocking asyncio.create_task(initialize_mcx_service()) - logger.info("✅ MCX WebSocket Service initialization started in background") + logger.debug("✅ MCX WebSocket Service initialization started in background") except ImportError as e: logger.warning(f"⚠️ MCX WebSocket Service not available: {e}") except Exception as e: logger.error(f"❌ MCX WebSocket Service failed to initialize: {e}") - logger.info("=" * 80) + logger.debug("=" * 80) logger.info("🟢 ALL SERVICES STARTED SUCCESSFULLY!") - logger.info("=" * 80) - logger.info("📊 Trading Application is ready to accept requests") - logger.info("🔌 WebSocket connections are running in background") - logger.info( + logger.debug("=" * 80) + logger.debug("📊 Trading Application is ready to accept requests") + logger.debug("🔌 WebSocket connections are running in background") + logger.debug( "⚠️ If WebSocket shows network errors, the app will still work without live data" ) - logger.info("=" * 80) + logger.debug("=" * 80) # Signal that startup is complete and token refresh can now proceed if CENTRALIZED_WS_AVAILABLE and centralized_manager: try: centralized_manager.mark_startup_complete() - logger.info("✅ Marked startup complete - token refresh enabled") + logger.debug("✅ Marked startup complete - token refresh enabled") except Exception as e: logger.warning(f"⚠️ Error marking startup complete: {e}") # Start simple unified WebSocket broadcast task try: - logger.info("🚀 Starting Simple Unified WebSocket System...") + logger.debug("🚀 Starting Simple Unified WebSocket System...") from router.unified_websocket_routes import register_engine_listeners register_engine_listeners() - logger.info("✅ Simple Unified WebSocket broadcast started") + logger.debug("✅ Simple Unified WebSocket broadcast started") except Exception as e: logger.error(f"❌ Failed to start Unified WebSocket broadcast: {e}") logger.info("🎯 Application is fully operational and ready for trading!") - - log_structured(event="APP_STARTUP_COMPLETE", message="Application startup completed successfully") + + log_structured( + event="APP_STARTUP_COMPLETE", + message="Application startup completed successfully", + ) yield except Exception as e: logger.exception("🔥 Enhanced lifespan startup failed - but app will continue") - log_structured(event="APP_STARTUP_CRITICAL_FAILURE", level="CRITICAL", message=str(e)) + log_structured( + event="APP_STARTUP_CRITICAL_FAILURE", level="CRITICAL", message=str(e) + ) yield finally: # Enhanced cleanup logger.info("🛑 Starting enhanced shutdown...") - log_structured(event="APP_SHUTDOWN_START", message="Starting application shutdown") + log_structured( + event="APP_SHUTDOWN_START", message="Starting application shutdown" + ) # Unified WebSocket connections cleanup (handled automatically by FastAPI) - logger.info("✅ Unified WebSocket connections will be cleaned up by FastAPI") + logger.debug("✅ Unified WebSocket connections will be cleaned up by FastAPI") # NEW: Stop centralized WebSocket system if CENTRALIZED_WS_AVAILABLE and centralized_manager: try: await centralized_manager.stop() - logger.info("✅ NEW: Centralized WebSocket system stopped") + logger.debug("✅ NEW: Centralized WebSocket system stopped") except Exception as e: logger.error(f"Error stopping centralized WebSocket: {e}") @@ -890,7 +944,7 @@ async def start_upstox_in_background(): from services.websocket.mcx.integration import stop_mcx_service await stop_mcx_service() - logger.info("✅ MCX WebSocket service stopped") + logger.debug("✅ MCX WebSocket service stopped") except ImportError: pass # MCX service not available except Exception as e: @@ -921,7 +975,7 @@ async def start_upstox_in_background(): except Exception as e: logger.error(f"Error stopping market scheduler: {e}") - logger.info( + logger.debug( "🎯 Enhanced gap and breakout detection services shutdown completed" ) @@ -930,12 +984,19 @@ async def start_upstox_in_background(): from services.upstox_automation_service import stop_upstox_automation stop_upstox_automation() - logger.info("✅ Upstox token automation stopped") + logger.debug("✅ Upstox token automation stopped") except Exception as e: logger.error(f"Error stopping Upstox automation: {e}") logger.info("🛑 Enhanced lifespan shutdown complete.") - log_structured(event="APP_SHUTDOWN_COMPLETE", message="Application shutdown completed") + log_structured( + event="APP_SHUTDOWN_COMPLETE", message="Application shutdown completed" + ) + + # FINAL MEMORY CLEANUP + import gc + gc.collect() + logger.debug("🧹 Final garbage collection completed") # FIXED: Enhanced trading engine startup function @@ -947,14 +1008,14 @@ async def start_enhanced_trading_engine(): while retry_count < max_retries: try: - logger.info( + logger.debug( f"🚀 Starting Enhanced Trading Engine (attempt {retry_count + 1}/{max_retries})" ) # Start the trading engine if trading_engine: await trading_engine.start_trading_engine() - logger.info("✅ Enhanced Trading Engine started successfully") + logger.debug("✅ Enhanced Trading Engine started successfully") break else: logger.error("Trading engine not initialized") @@ -968,7 +1029,7 @@ async def start_enhanced_trading_engine(): if retry_count < max_retries: wait_time = 60 * retry_count - logger.info(f"⏳ Retrying in {wait_time} seconds...") + logger.debug(f"⏳ Retrying in {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error( @@ -1048,7 +1109,7 @@ async def start_enhanced_trading_engine(): app.include_router( unified_websocket_router, tags=["🚀 Unified WebSocket - Real-Time"] ) - logger.info("✅ Simple Unified WebSocket routes registered") + logger.debug("✅ Simple Unified WebSocket routes registered") except ImportError as e: logger.warning(f"⚠️ Unified WebSocket routes not available: {e}") @@ -1064,7 +1125,7 @@ async def start_enhanced_trading_engine(): from router.nifty_strategy_router import nifty_strategy_router app.include_router(nifty_strategy_router, tags=["NIFTY 09:40 Strategy"]) - logger.info("✅ NIFTY 09:40 Strategy routes registered") + logger.debug("✅ NIFTY 09:40 Strategy routes registered") except ImportError as e: logger.warning(f"⚠️ NIFTY strategy routes not available: {e}") @@ -1076,11 +1137,11 @@ async def start_enhanced_trading_engine(): app.include_router( realtime_stream_router, tags=["🚀 ZERO-DELAY Real-time Streaming"] ) - logger.info("✅ 🚀 ZERO-DELAY real-time streaming routes registered") + logger.debug("✅ 🚀 ZERO-DELAY real-time streaming routes registered") except ImportError as e: logger.warning(f"⚠️ Real-time streaming routes not available: {e}") # Breakout router removed - using enhanced_breakout_engine instead - logger.info("✅ Using enhanced_breakout_engine for breakout functionality") + logger.debug("✅ Using enhanced_breakout_engine for breakout functionality") # NEW: Add centralized WebSocket routes if CENTRALIZED_ROUTES_AVAILABLE: @@ -1089,14 +1150,14 @@ async def start_enhanced_trading_engine(): prefix="/api/v1", tags=["NEW: Centralized WebSocket"], ) - logger.info("✅ NEW: Centralized WebSocket routes registered") + logger.debug("✅ NEW: Centralized WebSocket routes registered") else: logger.error("❌ NEW: Centralized WebSocket routes not available") # Legacy market WebSocket for backward compatibility if LEGACY_MARKET_WS_AVAILABLE: app.include_router(legacy_market_ws_router, tags=["Legacy Market WebSocket"]) - logger.info( + logger.debug( "✅ Legacy market WebSocket routes available for backward compatibility" ) @@ -1111,7 +1172,7 @@ async def preflight_handler(full_path: str): # SOCKET.IO SETUP - Simplified Integration for Stability # ============================================================================ # Temporarily use basic Socket.IO setup to avoid ASGI compatibility issues -logger.info("🔧 Initializing Socket.IO with basic setup for stability") +logger.debug("🔧 Initializing Socket.IO with basic setup for stability") sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins=ALLOWED_ORIGINS, engineio_logger=False ) @@ -1278,21 +1339,6 @@ def sync_check_market_engine(): } -@app.get("/api/v1/trading/{symbol}") -async def get_trading_data(symbol: str): - """Get comprehensive trading data for a symbol""" - try: - from services.instrument_registry import instrument_registry - - return { - "success": True, - "updated_at": datetime.now().isoformat(), - } - except Exception as e: - logger.error(f"❌ Error getting trading data for {symbol}: {e}") - return {"success": False, "error": str(e)} - - @app.post("/api/v1/system/refresh-instruments") async def refresh_instruments(): """Admin endpoint to refresh instrument data""" @@ -1613,7 +1659,12 @@ async def restart_trading_engine(): @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Global error on {request.url}: {str(exc)}") - log_structured(event="GLOBAL_EXCEPTION", level="ERROR", message=f"Unhandled exception on {request.url}", data={"error": str(exc), "url": str(request.url)}) + log_structured( + event="GLOBAL_EXCEPTION", + level="ERROR", + message=f"Unhandled exception on {request.url}", + data={"error": str(exc), "url": str(request.url)}, + ) # Don't expose Redis connection errors if "redis" in str(exc).lower() or "connection" in str(exc).lower(): @@ -1700,20 +1751,20 @@ async def get_engine_status(sid, data): # Apply filter to suppress noisy access logs logging.getLogger("uvicorn.access").addFilter(EndpointFilter()) - logger.info( + logger.debug( "🚀 Launching Enhanced Trading Platform with NEW Centralized WebSocket System..." ) # Log system configuration - logger.info(f"🔧 Environment: {os.getenv('ENVIRONMENT', 'development')}") - logger.info(f"🔧 Redis Enabled: {os.getenv('REDIS_ENABLED', 'true')}") - logger.info( + logger.debug(f"🔧 Environment: {os.getenv('ENVIRONMENT', 'development')}") + logger.debug(f"🔧 Redis Enabled: {os.getenv('REDIS_ENABLED', 'true')}") + logger.debug( f"🔧 NEW Centralized WebSocket: {'Available' if CENTRALIZED_WS_AVAILABLE else 'Not Available'}" ) - logger.info( + logger.debug( f"🔧 NEW Centralized Routes: {'Available' if CENTRALIZED_ROUTES_AVAILABLE else 'Not Available'}" ) - logger.info( + logger.debug( f"🔧 Legacy WebSocket: {'Available' if LEGACY_MARKET_WS_AVAILABLE else 'Not Available'}" ) @@ -1721,7 +1772,7 @@ async def get_engine_status(sid, data): "app:sio_app", host="0.0.0.0", port=int(os.getenv("PORT", 8000)), - log_level="info", + log_level="warning", reload=os.getenv("DEBUG", "false").lower() == "true", workers=1, ) diff --git a/core/production_logging.py b/core/production_logging.py index afcaa49..332d462 100644 --- a/core/production_logging.py +++ b/core/production_logging.py @@ -4,68 +4,91 @@ """ import logging import logging.handlers +import time +import os +import sys +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Optional, List +from contextlib import contextmanager + +# Optional: Try to import concurrent-log-handler for safe multi-process logging try: from concurrent_log_handler import ConcurrentRotatingFileHandler HAS_CONCURRENT_LOG = True except ImportError: HAS_CONCURRENT_LOG = False -import json -import sys -import traceback -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Dict, Optional -import os -from contextlib import contextmanager -import uuid + +class ThrottledFilter(logging.Filter): + """ + Filter that silences noisy INFO logs by default in production. + Can be enabled via ENABLE_NOISY_LOGS=true environment variable. + """ + def __init__(self, name="", interval=60, max_cache_size=1000): + super().__init__(name) + self.interval = interval + self.max_cache_size = max_cache_size + self.last_logged = {} + # Check if noisy logs are explicitly enabled + self.show_noisy = os.getenv('ENABLE_NOISY_LOGS', 'false').lower() == 'true' + + def filter(self, record): + # Always allow WARNING, ERROR, CRITICAL + if record.levelno >= logging.WARNING: + return True + + # If noisy logs are enabled, allow them (with optional throttling) + if self.show_noisy: + return True + + # logic to identify and SILENCE noisy logs + msg_key = record.getMessage() + + # Patterns that should be hidden in production by default + is_noisy = any(pattern.lower() in msg_key.lower() for pattern in [ + "Received data", "PnL Update", "Heartbeat", "Broadcast", "tick data", + "ltp update", "Processed", "Instrument", "Analytics", "Engine", "Socket", + "WebSocket", "Connecting", "Disconnecting", "Sentiment", "Heatmap" + ]) + + if is_noisy: + # Completely silence these in production unless ENABLE_NOISY_LOGS is true + return False + + return True + +class ProductionFormatter(logging.Formatter): + """Compact string-based formatter for production to minimize cost/overhead""" + def format(self, record: logging.LogRecord) -> str: + # Extremely compact format for production logs to save costs + # [Level] Name: Message + level = record.levelname[0] # Single letter level + timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime('%H:%M:%S') + return f"[{level}][{timestamp}] {record.name}: {record.getMessage()}" class TradingFormatter(logging.Formatter): - """Custom formatter for trading application with structured logging""" + """Custom formatter for trading application with structured logging (Development)""" def format(self, record: logging.LogRecord) -> str: - # Create structured log entry + # Create optimized structured log entry log_entry = { - 'timestamp': datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), - 'level': record.levelname, - 'logger': record.name, - 'message': record.getMessage(), - 'module': record.module, - 'function': record.funcName, - 'line': record.lineno, - 'thread_id': record.thread, - 'process_id': record.process, + 't': datetime.fromtimestamp(record.created, tz=timezone.utc).strftime('%H:%M:%S'), + 'l': record.levelname[:1], + 'n': record.name, + 'm': record.getMessage(), } - # Add extra fields if present - if hasattr(record, 'user_id'): - log_entry['user_id'] = record.user_id - if hasattr(record, 'broker'): - log_entry['broker'] = record.broker - if hasattr(record, 'trade_id'): - log_entry['trade_id'] = record.trade_id - if hasattr(record, 'order_id'): - log_entry['order_id'] = record.order_id - if hasattr(record, 'symbol'): - log_entry['symbol'] = record.symbol - if hasattr(record, 'amount'): - log_entry['amount'] = record.amount - if hasattr(record, 'request_id'): - log_entry['request_id'] = record.request_id - - # Add exception info if present if record.exc_info: - log_entry['exception'] = { - 'type': record.exc_info[0].__name__, - 'message': str(record.exc_info[1]), - 'traceback': traceback.format_exception(*record.exc_info) - } + log_entry['ex'] = str(record.exc_info[1]) return json.dumps(log_entry, ensure_ascii=False) class AuditLogger: """Dedicated audit logger for compliance and regulatory requirements""" - def __init__(self, log_dir: str = "logs/audit"): + def __init__(self, log_dir: str = "logs/audit", silent: bool = False): self._is_production = ( os.getenv('ENVIRONMENT') == 'production' or os.getenv('RAILWAY_ENVIRONMENT') ) @@ -75,15 +98,23 @@ def __init__(self, log_dir: str = "logs/audit"): # Create audit logger self.logger = logging.getLogger('audit') + + # If silent mode, set to a level that won't log anything + if silent and self._is_production: + self.logger.setLevel(logging.CRITICAL + 1) + self.logger.addHandler(logging.NullHandler()) + self.logger.propagate = False + return + self.logger.setLevel(logging.INFO) # Remove default handlers to avoid duplication self.logger.handlers.clear() if self._is_production: - # In production, log audit events to stdout (structured JSON) + # In production, use compact string logging to save costs console_handler = logging.StreamHandler(sys.stdout) - console_handler.setFormatter(TradingFormatter()) + console_handler.setFormatter(ProductionFormatter()) self.logger.addHandler(console_handler) else: # File handler for audit logs (concurrent-safe rotation) @@ -172,7 +203,7 @@ def log_api_access(self, user_id: str, endpoint: str, method: str, class TradingLogger: """Main production logging setup for trading application""" - def __init__(self, app_name: str = "TradingBot", log_level: str = "INFO"): + def __init__(self, app_name: str = "TradingBot", log_level: str = "WARNING"): self.app_name = app_name # Enhanced production detection self._is_production = ( @@ -180,6 +211,12 @@ def __init__(self, app_name: str = "TradingBot", log_level: str = "INFO"): os.getenv('RAILWAY_ENVIRONMENT') is not None or os.getenv('RAILWAY_STATIC_URL') is not None ) + + # Check for absolute silence mode (to save costs in production) + self._silent_mode = log_level.upper() in ('SILENT', 'NONE', 'OFF') + if self._silent_mode: + log_level = "CRITICAL" + self.log_dir = Path("logs") # Only create directories if NOT in production @@ -190,30 +227,61 @@ def __init__(self, app_name: str = "TradingBot", log_level: str = "INFO"): (self.log_dir / "errors").mkdir(exist_ok=True) (self.log_dir / "performance").mkdir(exist_ok=True) + # Silence noisy third-party loggers in production + if self._is_production: + for noisy_logger in [ + 'urllib3', 'apscheduler', 'matplotlib', 'playwright', + 'uvicorn.access', 'engineio', 'socketio', 'tensorflow', + 'h11', 'httpcore', 'httpx', 'asyncio', 'sqlalchemy', + 'pydantic', 'fastapi', 'selenium', 'multiprocessing' + ]: + logging.getLogger(noisy_logger).setLevel(logging.CRITICAL if self._silent_mode else logging.WARNING) + + # Force WARNING level in production if not explicitly set to something else + if self._is_production and os.getenv('LOG_LEVEL') is None and not self._silent_mode: + log_level = "WARNING" + + # If in production and silent mode, monkey-patch print to stop the flood + if self._is_production and self._silent_mode: + # This completely stops all 'print()' calls from emitting anything to stdout + # without refactoring hundreds of files. + import builtins + builtins.print = lambda *args, **kwargs: None + # Set up loggers self.setup_application_logger(log_level) - self.setup_trading_logger() + self.setup_trading_logger(log_level) self.setup_error_logger() - self.setup_performance_logger() + self.setup_performance_logger(log_level) # Initialize audit logger - self.audit = AuditLogger() + self.audit = AuditLogger(silent=self._silent_mode) def setup_application_logger(self, log_level: str): """Set up main application logger""" logger = logging.getLogger() - logger.setLevel(getattr(logging, log_level.upper())) + level = getattr(logging, log_level.upper()) + logger.setLevel(level) # Remove default handlers for handler in logger.handlers[:]: logger.removeHandler(handler) + # If silent mode, we don't even add a console handler + if self._is_production and self._silent_mode: + logger.addHandler(logging.NullHandler()) + return + # Console handler - ALWAYS ENABLED FOR CLOUD PLATFORMS (Railway, Render, etc.) console_handler = logging.StreamHandler(sys.stdout) - # In production, use structured JSON logging even for console + # Apply ThrottledFilter to prevent cost spikes from ticks/pnl updates + # Still allows all WARNING and ERROR logs through immediately + console_handler.addFilter(ThrottledFilter(interval=60)) + + # In production, use compact string logging to save costs if os.getenv('ENVIRONMENT') == 'production' or os.getenv('RAILWAY_ENVIRONMENT'): - console_handler.setFormatter(TradingFormatter()) + console_handler.setFormatter(ProductionFormatter()) else: console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' @@ -222,30 +290,14 @@ def setup_application_logger(self, log_level: str): logger.addHandler(console_handler) - # File handler with rotation (non-production only) - if not self._is_production: - if HAS_CONCURRENT_LOG: - file_handler = ConcurrentRotatingFileHandler( - filename=self.log_dir / "application" / "app.log", - mode='a', - maxBytes=10 * 1024 * 1024, # 10MB - backupCount=30, - encoding='utf-8' - ) - else: - file_handler = logging.handlers.RotatingFileHandler( - filename=self.log_dir / "application" / "app.log", - maxBytes=10 * 1024 * 1024, - backupCount=30, - encoding='utf-8' - ) - file_handler.setFormatter(TradingFormatter()) - logger.addHandler(file_handler) - - def setup_trading_logger(self): + def setup_trading_logger(self, log_level: Optional[str] = None): """Set up dedicated trading operations logger""" trading_logger = logging.getLogger('trading') - trading_logger.setLevel(logging.INFO) + + # Use provided level, or current instance level, or default to WARNING + effective_level = log_level or os.getenv('LOG_LEVEL', 'WARNING') + level = getattr(logging, effective_level.upper()) + trading_logger.setLevel(level) # Trading operations log (non-production only) if not self._is_production: @@ -310,10 +362,15 @@ def setup_error_logger(self): error_logger.propagate = False - def setup_performance_logger(self): + def setup_performance_logger(self, log_level: Optional[str] = None): """Set up performance monitoring logger""" perf_logger = logging.getLogger('performance') - perf_logger.setLevel(logging.INFO) + + # Use provided level, or current instance level, or default to WARNING + effective_level = log_level or os.getenv('LOG_LEVEL', 'WARNING') + level = getattr(logging, effective_level.upper()) + perf_logger.setLevel(level) + # Performance log file (non-production only) if not self._is_production: @@ -344,15 +401,18 @@ def get_trading_logger() -> TradingLogger: """Get or create the global trading logger instance""" global _trading_logger if _trading_logger is None: - log_level = os.getenv('LOG_LEVEL', 'INFO') + log_level = os.getenv('LOG_LEVEL', 'WARNING') _trading_logger = TradingLogger(log_level=log_level) return _trading_logger + def get_audit_logger() -> AuditLogger: """Get or create the global audit logger instance""" global _audit_logger if _audit_logger is None: - _audit_logger = AuditLogger() + log_level = os.getenv('LOG_LEVEL', 'WARNING') + silent = log_level.upper() in ('SILENT', 'NONE', 'OFF') + _audit_logger = AuditLogger(silent=silent) return _audit_logger @contextmanager diff --git a/utils/logging_utils.py b/utils/logging_utils.py index 1e06187..122859c 100644 --- a/utils/logging_utils.py +++ b/utils/logging_utils.py @@ -4,7 +4,9 @@ from typing import Any, Dict, Optional from core.production_logging import get_trading_logger, get_audit_logger -logger = get_trading_logger().setup_trading_logger() +# Initialize logging (singleton handles multiple calls) +get_trading_logger() + # Use the named logger 'trading' which is setup in production_logging trading_logger = logging.getLogger('trading') audit_logger = get_audit_logger()