diff --git a/QEfficient/utils/logging_utils.py b/QEfficient/utils/logging_utils.py index d2086d830..4906a2e38 100644 --- a/QEfficient/utils/logging_utils.py +++ b/QEfficient/utils/logging_utils.py @@ -5,54 +5,228 @@ # # ----------------------------------------------------------------------------- +import json import logging +import os +import queue +import threading +from datetime import datetime +from logging.handlers import RotatingFileHandler +from typing import Optional -class QEffFormatter(logging.Formatter): - """ - Formatter class used to set colors for printing different logging levels of messages on console. +class JSONNamespaceFormatter(logging.Formatter): """ + Custom formatter to output log records in JSON format with metadata. + + Methods: + format(record): Formats a log record into a JSON string. - cyan: str = "\x1b[38;5;14m" - yellow: str = "\x1b[33;20m" - red: str = "\x1b[31;20m" - bold_red: str = "\x1b[31;1m" - reset: str = "\x1b[0m" - common_format: str = "%(levelname)s - %(name)s - %(message)s" # type: ignore - format_with_line_info = "%(levelname)s - %(name)s - %(message)s (%(filename)s:%(lineno)d)" # type: ignore - - FORMATS = { - logging.DEBUG: cyan + format_with_line_info + reset, - logging.INFO: cyan + common_format + reset, - logging.WARNING: yellow + common_format + reset, - logging.ERROR: red + format_with_line_info + reset, - logging.CRITICAL: bold_red + format_with_line_info + reset, - } + Parameters: + record (logging.LogRecord): The log record to format. + + Returns: + str: JSON-formatted log string. + """ def format(self, record): + log_record = { + "date": datetime.fromtimestamp(record.created).strftime("%Y-%m-%d"), + "time": datetime.fromtimestamp(record.created).strftime("%H:%M:%S"), + "level": record.levelname, + "namespace": getattr(record, "namespace", "default"), + "file": record.filename, + "line": record.lineno, + "message": record.getMessage(), + } + return json.dumps(log_record) + + +class QEFFLoggerThread(threading.Thread): + """ + Background thread to handle logging asynchronously using a queue. + + Attributes: + logger (logging.Logger): Logger instance to handle log records. + log_queue (queue.Queue): Queue from which log records are consumed. + running (bool): Flag to control thread execution. + """ + + def __init__(self, logger, log_queue): """ - Overriding the base class method to Choose format based on log level. + Initialize the logging thread. + + Parameters: + logger (logging.Logger): Logger instance. + log_queue (queue.Queue): Queue for log records. """ - log_fmt = self.FORMATS.get(record.levelno) - formatter = logging.Formatter(log_fmt) - return formatter.format(record) + super().__init__(daemon=True) + self.logger = logger + self.log_queue = log_queue + self.running = True + def run(self): + """ + Continuously process log records from the queue and pass them to the logger. + """ + while self.running: + try: + record = self.log_queue.get(timeout=1) + self.logger.handle(record) + except queue.Empty: + continue -def create_logger() -> logging.Logger: + def stop(self): + """ + Stop the logging thread gracefully. + """ + self.running = False + + +class QEFFLogger: """ - Creates a logger object with Colored QEffFormatter. + Singleton logger class for structured logging with namespace support. + + Class Attributes: + _instance (Optional[logging.Logger]): Singleton logger instance. + _logfile (Optional[str]): Path to the log file. + _log_queue (queue.Queue): Queue for asynchronous logging. + _logger_thread (Optional[QEFFLoggerThread]): Background logging thread. """ - logger = logging.getLogger("QEfficient") - # create console handler and set level to debug - ch = logging.StreamHandler() - ch.setLevel(logging.INFO) - # define formatter - ch.setFormatter(QEffFormatter()) + _instance: Optional[logging.Logger] = None + _logfile: Optional[str] = None + _log_queue: queue.Queue = queue.Queue() + _logger_thread: Optional[QEFFLoggerThread] = None + + def __init__(self, loglevel: Optional[str] = "INFO", log_path: Optional[str] = None): + """ + Initialize the logger instance with specified log level and path. + + Parameters: + loglevel (str): Logging level (e.g., "INFO", "DEBUG"). + log_path (str): Optional path to the log file. + """ + if QEFFLogger._instance is None: + self.loglevel = loglevel + self.log_path = log_path + self.logger = self._initialize_logger() + QEFFLogger._instance = self.logger + QEFFLogger._logger_thread = QEFFLoggerThread(self.logger, QEFFLogger._log_queue) + QEFFLogger._logger_thread.start() - logger.addHandler(ch) - return logger + def _initialize_logger(self) -> logging.Logger: + """ + Set up the logger with rotating file handler and JSON formatter. + + Returns: + logging.Logger: Configured logger instance. + """ + if self.log_path is None: + log_dir = os.path.expanduser("~/.cache/qefficient_logs") + os.makedirs(log_dir, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.log_path = os.path.join(log_dir, f"QEFF_{timestamp}.log") + QEFFLogger._logfile = self.log_path + + numeric_level = getattr(logging, self.loglevel.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {self.loglevel}") + + logger = logging.getLogger("QEFF_LOGGER") + logger.setLevel(numeric_level) + + if not logger.handlers: + handler = RotatingFileHandler(self.log_path, maxBytes=5 * 1024 * 1024, backupCount=10) + handler.setFormatter(JSONNamespaceFormatter()) + logger.addHandler(handler) + + return logger + + @classmethod + def get_logger( + cls, namespace: str, loglevel: Optional[str] = "INFO", log_path: Optional[str] = None + ) -> logging.Logger: + """ + Retrieve a logger adapter with a specific namespace. + + Parameters: + namespace (str): Logical grouping for the log. + loglevel (str): Logging level. + log_path (str): Optional path to the log file. + + Returns: + logging.Logger: Logger adapter with namespace. + """ + if cls._instance is None: + cls(loglevel, log_path) + return logging.LoggerAdapter(cls._instance, {"namespace": namespace}) + + @classmethod + def log(cls, level: str, namespace: str, msg: str, fn: str = "", lno: int = 0, func: str = ""): + """ + Log a message with specified level and metadata. + + Parameters: + level (str): Logging level (e.g., "INFO", "ERROR"). + namespace (str): Logical grouping for the log. + msg (str): Log message. + fn (str): Filename where the log is generated. + lno (int): Line number in the file. + func (str): Function name. + """ + if cls._instance is None: + raise RuntimeError("Logger has not been initialized. Call get_logger() first.") + + level_num = getattr(logging, level.upper(), None) + if not isinstance(level_num, int): + raise ValueError(f"Invalid log level: {level}") + + record = cls._instance.makeRecord( + name="QEFF_LOGGER", + level=level_num, + fn=fn, + lno=lno, + msg=msg, + args=(), + exc_info=None, + func=func, + extra={"namespace": namespace}, + ) + cls._log_queue.put(record) + + @classmethod + def set_loglevel(cls, loglevel: Optional[str] = "INFO"): + """ + Update the log level of the logger. + + Parameters: + loglevel (str): New log level to set. + """ + if cls._instance is None: + raise RuntimeError("Logger has not been initialized yet. Call get_logger() first.") + + numeric_level = getattr(logging, loglevel.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {loglevel}") + + cls._instance.setLevel(numeric_level) + + @classmethod + def close_logger(cls): + """ + Gracefully shut down the logger and its thread. + """ + if cls._logger_thread: + cls._logger_thread.stop() + cls._logger_thread.join() + cls._logger_thread = None -# Define the logger object that can be used for logging purposes throughout the module. -logger = create_logger() + if cls._instance: + handlers = cls._instance.handlers[:] + for handler in handlers: + handler.close() + cls._instance.removeHandler(handler) + cls._instance = None + cls._logfile = None diff --git a/tests/utils/test_logger.py b/tests/utils/test_logger.py new file mode 100644 index 000000000..e6bd51c5c --- /dev/null +++ b/tests/utils/test_logger.py @@ -0,0 +1,48 @@ +# ----------------------------------------------------------------------------- +# +# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries. +# SPDX-License-Identifier: BSD-3-Clause +# +# ----------------------------------------------------------------------------- + +import threading +import time + +from QEfficient.utils.logging_utils import QEFFLogger + +# ------------------------------- +# Define namespace once +# ------------------------------- +NAMESPACE = "model" + +# ------------------------------- +# Initialize logger +# ------------------------------- +logger = QEFFLogger.get_logger(NAMESPACE, "DEBUG") + + +# ------------------------------- +# Worker function for threads +# ------------------------------- +def log_worker(thread_id): + for i in range(5): + logger.info(f"Thread-{thread_id} logging message {i}") + time.sleep(0.1) + + +# ------------------------------- +# Create and start threads +# ------------------------------- +threads = [] +for t_id in range(3): + t = threading.Thread(target=log_worker, args=(t_id,)) + threads.append(t) + t.start() + +for t in threads: + t.join() + +# ------------------------------- +# Graceful shutdown +# ------------------------------- +QEFFLogger.close_logger()