From b27a7c5f55f5307b74dd04e25898b14cca003625 Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Tue, 26 Aug 2025 15:54:07 +0530 Subject: [PATCH 1/8] refactor: improve console logging for commands Signed-off-by: Demolus13 --- src/macaron/__main__.py | 178 ++++++--- src/macaron/console.py | 339 ++++++++++++++++++ src/macaron/output_reporter/reporter.py | 22 +- src/macaron/output_reporter/results.py | 13 +- src/macaron/policy_engine/policy_engine.py | 17 +- src/macaron/repo_finder/repo_finder.py | 26 +- src/macaron/slsa_analyzer/analyzer.py | 66 +++- .../slsa_analyzer/checks/base_check.py | 17 +- src/macaron/slsa_analyzer/registry.py | 34 +- 9 files changed, 625 insertions(+), 87 deletions(-) create mode 100644 src/macaron/console.py diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index d1180d9bb..b748afed2 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -20,6 +20,7 @@ ) from macaron.config.defaults import create_defaults, load_defaults from macaron.config.global_config import global_config +from macaron.console import access_handler from macaron.errors import ConfigurationError from macaron.output_reporter.reporter import HTMLReporter, JSONReporter, PolicyReporter from macaron.policy_engine.policy_engine import run_policy_engine, show_prelude @@ -63,7 +64,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None if analyzer_single_args.provenance_expectation is not None: if not os.path.exists(analyzer_single_args.provenance_expectation): logger.critical( - 'The provenance expectation file "%s" does not exist.', analyzer_single_args.provenance_expectation + 'The provenance expectation file "%s" does not exist.', + analyzer_single_args.provenance_expectation, ) sys.exit(os.EX_OSFILE) global_config.load_expectation_files(analyzer_single_args.provenance_expectation) @@ -72,7 +74,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None if analyzer_single_args.python_venv is not None: if not os.path.exists(analyzer_single_args.python_venv): logger.critical( - 'The Python virtual environment path "%s" does not exist.', analyzer_single_args.python_venv + 'The Python virtual environment path "%s" does not exist.', + analyzer_single_args.python_venv, ) sys.exit(os.EX_OSFILE) global_config.load_python_venv(analyzer_single_args.python_venv) @@ -95,7 +98,10 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None else: user_provided_local_maven_repo = analyzer_single_args.local_maven_repo if not os.path.isdir(user_provided_local_maven_repo): - logger.error("The user provided local Maven repo at %s is not valid.", user_provided_local_maven_repo) + logger.error( + "The user provided local Maven repo at %s is not valid.", + user_provided_local_maven_repo, + ) sys.exit(os.EX_USAGE) global_config.local_maven_repo = user_provided_local_maven_repo @@ -111,7 +117,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None lstrip_blocks=True, ) html_reporter = HTMLReporter( - env=custom_jinja_env, target_template=os.path.basename(analyzer_single_args.template_path) + env=custom_jinja_env, + target_template=os.path.basename(analyzer_single_args.template_path), ) if not html_reporter.template: logger.error("Exiting because the custom template cannot be found.") @@ -207,8 +214,10 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int: result = run_policy_engine(verify_policy_args.database, policy_content) vsa = generate_vsa(policy_content=policy_content, policy_result=result) + rich_handler = access_handler.get_handler() if vsa is not None: vsa_filepath = os.path.join(global_config.output_path, "vsa.intoto.jsonl") + rich_handler.update_vsa(vsa_filepath) logger.info( "Generating the Verification Summary Attestation (VSA) to %s.", os.path.relpath(vsa_filepath, os.getcwd()), @@ -222,8 +231,12 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int: file.write(json.dumps(vsa)) except OSError as err: logger.error( - "Could not generate the VSA to %s. Error: %s", os.path.relpath(vsa_filepath, os.getcwd()), err + "Could not generate the VSA to %s. Error: %s", + os.path.relpath(vsa_filepath, os.getcwd()), + err, ) + else: + rich_handler.update_vsa("No VSA generated.") policy_reporter = PolicyReporter() policy_reporter.generate(global_config.output_path, result) @@ -290,16 +303,23 @@ def find_source(find_args: argparse.Namespace) -> int: def perform_action(action_args: argparse.Namespace) -> None: """Perform the indicated action of Macaron.""" + rich_handler = access_handler.get_handler() match action_args.action: case "dump-defaults": + if not action_args.disable_rich_output: + rich_handler.start("dump-defaults") # Create the defaults.ini file in the output dir and exit. create_defaults(action_args.output_dir, os.getcwd()) sys.exit(os.EX_OK) case "verify-policy": + if not action_args.disable_rich_output: + rich_handler.start("verify-policy") sys.exit(verify_policy(action_args)) case "analyze": + if not action_args.disable_rich_output: + rich_handler.start("analyze") if not global_config.gh_token: logger.error("GitHub access token not set.") sys.exit(os.EX_USAGE) @@ -317,6 +337,8 @@ def perform_action(action_args: argparse.Namespace) -> None: analyze_slsa_levels_single(action_args) case "find-source": + if not action_args.disable_rich_output: + rich_handler.start("find-source") try: for git_service in GIT_SERVICES: git_service.load_defaults() @@ -393,6 +415,14 @@ def main(argv: list[str] | None = None) -> None: action="store_true", ) + main_parser.add_argument( + "-dro", + "--disable-rich-output", + default=False, + help="Disable Rich UI output", + action="store_true", + ) + main_parser.add_argument( "-o", "--output-dir", @@ -531,7 +561,10 @@ def main(argv: list[str] | None = None) -> None: ) # Dump the default values. - sub_parser.add_parser(name="dump-defaults", description="Dumps the defaults.ini file to the output directory.") + sub_parser.add_parser( + name="dump-defaults", + description="Dumps the defaults.ini file to the output directory.", + ) # Verify the Datalog policy. vp_parser = sub_parser.add_parser(name="verify-policy") @@ -593,65 +626,94 @@ def main(argv: list[str] | None = None) -> None: main_parser.print_help() sys.exit(os.EX_USAGE) - if args.verbose: - log_level = logging.DEBUG - log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] [%(levelname)s] %(message)s" - else: - log_level = logging.INFO - log_format = "%(asctime)s [%(levelname)s] %(message)s" - # Set global logging config. We need the stream handler for the initial # output directory checking log messages. - st_handler = logging.StreamHandler(sys.stdout) - logging.basicConfig(format=log_format, handlers=[st_handler], force=True, level=log_level) + st_handler: logging.StreamHandler = logging.StreamHandler(sys.stdout) + rich_handler: logging.Handler = logging.Handler() + if args.disable_rich_output: + if args.verbose: + log_level = logging.DEBUG + log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] [%(levelname)s] %(message)s" + else: + log_level = logging.INFO + log_format = "%(asctime)s [%(levelname)s] %(message)s" + st_handler = logging.StreamHandler(sys.stdout) + logging.basicConfig(format=log_format, handlers=[st_handler], force=True, level=log_level) + else: + if args.verbose: + log_level = logging.DEBUG + log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] %(message)s" + else: + log_level = logging.INFO + log_format = "%(asctime)s %(message)s" + rich_handler = access_handler.set_handler(args.verbose) + logging.basicConfig(format=log_format, handlers=[rich_handler], force=True, level=log_level) - # Set the output directory. - if not args.output_dir: - logger.error("The output path cannot be empty. Exiting ...") - sys.exit(os.EX_USAGE) + try: + # Set the output directory. + if not args.output_dir: + logger.error("The output path cannot be empty. Exiting ...") + sys.exit(os.EX_USAGE) - if os.path.isfile(args.output_dir): - logger.error("The output directory already exists. Exiting ...") - sys.exit(os.EX_USAGE) + if os.path.isfile(args.output_dir): + logger.error("The output directory already exists. Exiting ...") + sys.exit(os.EX_USAGE) - if os.path.isdir(args.output_dir): - logger.info("Setting the output directory to %s", os.path.relpath(args.output_dir, os.getcwd())) - else: - logger.info("No directory at %s. Creating one ...", os.path.relpath(args.output_dir, os.getcwd())) - os.makedirs(args.output_dir) - - # Add file handler to the root logger. Remove stream handler from the - # root logger to prevent dependencies printing logs to stdout. - debug_log_path = os.path.join(args.output_dir, "debug.log") - log_file_handler = logging.FileHandler(debug_log_path, "w") - log_file_handler.setFormatter(logging.Formatter(log_format)) - logging.getLogger().removeHandler(st_handler) - logging.getLogger().addHandler(log_file_handler) - - # Add StreamHandler to the Macaron logger only. - mcn_logger = logging.getLogger("macaron") - mcn_logger.addHandler(st_handler) - - logger.info("The logs will be stored in debug.log") - - # Set Macaron's global configuration. - # The path to provenance expectation files will be updated if - # set through analyze sub-command. - global_config.load( - macaron_path=macaron.MACARON_PATH, - output_path=args.output_dir, - build_log_path=os.path.join(args.output_dir, "build_log"), - debug_level=log_level, - local_repos_path=args.local_repos_path, - resources_path=os.path.join(macaron.MACARON_PATH, "resources"), - ) + if os.path.isdir(args.output_dir): + logger.info( + "Setting the output directory to %s", + os.path.relpath(args.output_dir, os.getcwd()), + ) + else: + logger.info( + "No directory at %s. Creating one ...", + os.path.relpath(args.output_dir, os.getcwd()), + ) + os.makedirs(args.output_dir) + + # Add file handler to the root logger. Remove stream handler from the + # root logger to prevent dependencies printing logs to stdout. + debug_log_path = os.path.join(args.output_dir, "debug.log") + log_file_handler = logging.FileHandler(debug_log_path, "w") + log_file_handler.setFormatter(logging.Formatter(log_format)) + if args.disable_rich_output: + logging.getLogger().removeHandler(st_handler) + else: + logging.getLogger().removeHandler(rich_handler) + logging.getLogger().addHandler(log_file_handler) + + # Add StreamHandler to the Macaron logger only. + mcn_logger = logging.getLogger("macaron") + if args.disable_rich_output: + mcn_logger.addHandler(st_handler) + else: + mcn_logger.addHandler(rich_handler) + + logger.info("The logs will be stored in debug.log") + + # Set Macaron's global configuration. + # The path to provenance expectation files will be updated if + # set through analyze sub-command. + global_config.load( + macaron_path=macaron.MACARON_PATH, + output_path=args.output_dir, + build_log_path=os.path.join(args.output_dir, "build_log"), + debug_level=log_level, + local_repos_path=args.local_repos_path, + resources_path=os.path.join(macaron.MACARON_PATH, "resources"), + ) - # Load the default values from defaults.ini files. - if not load_defaults(args.defaults_path): - logger.error("Exiting because the defaults configuration could not be loaded.") - sys.exit(os.EX_NOINPUT) + # Load the default values from defaults.ini files. + if not load_defaults(args.defaults_path): + logger.error("Exiting because the defaults configuration could not be loaded.") + sys.exit(os.EX_NOINPUT) - perform_action(args) + perform_action(args) + finally: + if args.disable_rich_output: + st_handler.close() + else: + rich_handler.close() def _get_token_from_dict_or_env(token: str, token_dict: dict[str, str]) -> str: diff --git a/src/macaron/console.py b/src/macaron/console.py new file mode 100644 index 000000000..cb31362ab --- /dev/null +++ b/src/macaron/console.py @@ -0,0 +1,339 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module implements a rich console handler for logging.""" + +import logging +import time +from typing import Any + +from rich.console import Group, RenderableType +from rich.live import Live +from rich.logging import RichHandler +from rich.panel import Panel +from rich.progress import BarColumn, MofNCompleteColumn, Progress, TaskID, TextColumn +from rich.rule import Rule +from rich.status import Status +from rich.table import Table + +from macaron.slsa_analyzer.checks.check_result import CheckResultType + + +class Check: + """Class to represent a check with its status and target.""" + + status = "PENDING" + target = "" + + +class RichConsoleHandler(RichHandler): + """A rich console handler for logging with rich formatting and live updates.""" + + def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.setLevel(logging.DEBUG) + self.command = "" + self.logs: list[str] = [] + self.description_table = Table(show_header=False, box=None) + self.description_table_content: dict[str, str | Status] = { + "Full Name:": Status("[green]Processing[/]"), + "Local Cloned Path:": Status("[green]Processing[/]"), + "Remote Path:": Status("[green]Processing[/]"), + "Branch:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "Commit Date:": Status("[green]Processing[/]"), + "Excluded Checks:": Status("[green]Processing[/]"), + "Final Checks:": Status("[green]Processing[/]"), + "CI Services:": Status("[green]Processing[/]"), + "Build Tools:": Status("[green]Processing[/]"), + } + self.progress = Progress( + TextColumn(" RUNNING ANALYSIS"), + BarColumn(bar_width=None, complete_style="green"), + MofNCompleteColumn(), + ) + self.task_id: TaskID + self.progress_table = Table(show_header=False, box=None) + self.checks: dict[str, Check] = {} + self.failed_checks_table = Table(show_header=False, box=None) + self.summary_table = Table(show_header=False, box=None) + self.report_table = Table(show_header=False, box=None) + self.reports = { + "HTML Report": "Not Generated", + "Dependencies Report": "Not Generated", + "JSON Report": "Not Generated", + } + self.components_violates_table = Table(show_header=False, box=None) + self.components_satisfy_table = Table(show_header=False, box=None) + self.policy_summary_table = Table(show_header=False, box=None) + self.policy_summary: dict[str, str | Status] = { + "Passed Policies": "None", + "Failed Policies": "None", + "Policy Report": Status("[green]Generating[/]"), + } + self.verification_summary_attestation: str | None = None + self.verbose = verbose + self.verbose_panel = Panel( + "\n".join(self.logs), + title="Verbose Mode", + title_align="left", + border_style="blue", + ) + self.live = Live(get_renderable=self.make_layout, refresh_per_second=10) + + def emit(self, record: logging.LogRecord) -> None: + """Emit a log record with rich formatting.""" + log_time = time.strftime("%H:%M:%S") + msg = self.format(record) + + if record.levelno >= logging.ERROR: + self.logs.append(f"[red][ERROR][/red] {log_time} {msg}") + elif record.levelno >= logging.WARNING: + self.logs.append(f"[yellow][WARNING][/yellow] {log_time} {msg}") + else: + self.logs.append(f"[blue][INFO][/blue] {log_time} {msg}") + + self.verbose_panel.renderable = "\n".join(self.logs) + + def add_description_table_content(self, key: str, value: str | Status) -> None: + """Add or update a key-value pair in the description table.""" + self.description_table_content[key] = value + description_table = Table(show_header=False, box=None) + description_table.add_column("Details", justify="left") + description_table.add_column("Value", justify="left") + for field, content in self.description_table_content.items(): + description_table.add_row(field, content) + + self.description_table = description_table + + def no_of_checks(self, value: int) -> None: + """Initialize the progress bar with the total number of checks.""" + self.task_id = self.progress.add_task("analyzing", total=value) + + def update_checks(self, check_id: str, target: str, status: str = "RUNNING") -> None: + """Update the status and target of a specific check.""" + if check_id not in self.checks: + self.checks[check_id] = Check() + self.checks[check_id].status = status + self.checks[check_id].target = target + + progress_table = Table(show_header=False, box=None) + progress_table.add_column("Status", justify="left") + progress_table.add_column("Check", justify="left") + progress_table.add_column("Target", justify="left") + + for check_name, check in self.checks.items(): + if check.status == "RUNNING": + progress_table.add_row(Status("[bold green]RUNNING[/]"), check_name, check.target) + self.progress_table = progress_table + + if self.task_id is not None and status != "RUNNING": + self.progress.update(self.task_id, advance=1) + + def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None: + """Update the summary tables based on the checks summary.""" + failed_checks_table = Table(show_header=False, box=None) + failed_checks_table.add_column("Status", justify="left") + failed_checks_table.add_column("Check ID", justify="left") + failed_checks_table.add_column("Description", justify="left") + + failed_checks = checks_summary[CheckResultType.FAILED] + for check in failed_checks: + failed_checks_table.add_row( + "[bold red]FAILED[/]", + check.check.check_id, + check.check.check_description, + ) + + self.failed_checks_table = failed_checks_table + + summary_table = Table(show_header=False, box=None) + summary_table.add_column("Check Result Type", justify="left") + summary_table.add_column("Count", justify="left") + summary_table.add_row("Total Checks", str(total_checks), style="white") + + for check_result_type, checks in checks_summary.items(): + if check_result_type == CheckResultType.PASSED: + summary_table.add_row("PASSED", str(len(checks)), style="green") + if check_result_type == CheckResultType.FAILED: + summary_table.add_row("FAILED", str(len(checks)), style="red") + if check_result_type == CheckResultType.SKIPPED: + summary_table.add_row("SKIPPED", str(len(checks)), style="yellow") + if check_result_type == CheckResultType.DISABLED: + summary_table.add_row("DISABLED", str(len(checks)), style="bright_blue") + if check_result_type == CheckResultType.UNKNOWN: + summary_table.add_row("UNKNOWN", str(len(checks)), style="white") + + self.summary_table = summary_table + + def update_report_table(self, report_type: str, report_path: str) -> None: + """Update the report table with the given report type and path.""" + self.reports[report_type] = report_path + report_table = Table(show_header=False, box=None) + report_table.add_column("Report Type", justify="left") + report_table.add_column("Report Path", justify="left") + + for report_detail, report_value in self.reports.items(): + report_table.add_row(report_detail, report_value, style="blue") + + self.report_table = report_table + + def generate_policy_summary_table(self) -> None: + """Generate the policy summary table.""" + policy_summary_table = Table(show_header=False, box=None) + policy_summary_table.add_column("Detail", justify="left") + policy_summary_table.add_column("Value", justify="left") + + policy_summary_table.add_row( + "[bold green]Passed Policies[/]", + self.policy_summary["Passed Policies"], + ) + policy_summary_table.add_row( + "[bold red]Failed Policies[/]", + self.policy_summary["Failed Policies"], + ) + policy_summary_table.add_row("[bold blue]Policy Report[/]", self.policy_summary["Policy Report"]) + + self.policy_summary_table = policy_summary_table + + def update_policy_report(self, report_path: str) -> None: + """Update the policy report path in the policy summary.""" + self.policy_summary["Policy Report"] = report_path + self.generate_policy_summary_table() + + def update_vsa(self, vsa_path: str) -> None: + """Update the verification summary attestation path.""" + self.verification_summary_attestation = vsa_path + + def update_policy_engine(self, results: dict) -> None: + """Update the policy engine results.""" + components_violates_table = Table(show_header=False, box=None) + components_violates_table.add_column("Assign No.", justify="left") + components_violates_table.add_column("Component", justify="left") + components_violates_table.add_column("Policy", justify="left") + + for values in results["component_violates_policy"]: + components_violates_table.add_row(values[0], values[1], values[2]) + + self.components_violates_table = components_violates_table + + components_satisfy_table = Table(show_header=False, box=None) + components_satisfy_table.add_column("Assign No.", justify="left") + components_satisfy_table.add_column("Component", justify="left") + components_satisfy_table.add_column("Policy", justify="left") + + for values in results["component_satisfies_policy"]: + components_satisfy_table.add_row(values[0], values[1], values[2]) + + self.components_satisfy_table = components_satisfy_table + + self.policy_summary["Passed Policies"] = ( + "\n".join(policy[0] for policy in results["passed_policies"]) if results["passed_policies"] else "None" + ) + self.policy_summary["Failed Policies"] = ( + "\n".join(policy[0] for policy in results["failed_policies"]) if results["failed_policies"] else "None" + ) + + self.generate_policy_summary_table() + + def make_layout(self) -> Group: + """Create the overall layout for the console output.""" + layout: list[RenderableType] = [] + if self.command == "analyze": + layout = layout + [Rule(" DESCRIPTION", align="left")] + if self.description_table.row_count > 0: + layout = layout + ["", self.description_table] + if self.progress_table.row_count > 0: + layout = layout + ["", self.progress, "", self.progress_table] + if self.failed_checks_table.row_count > 0: + layout = layout + [ + "", + Rule(" SUMMARY", align="left"), + "", + self.failed_checks_table, + ] + if self.summary_table.row_count > 0: + layout = layout + ["", self.summary_table] + if self.report_table.row_count > 0: + layout = layout + [ + self.report_table, + ] + elif self.summary_table.row_count > 0: + layout = layout + [ + "", + Rule(" SUMMARY", align="left"), + "", + self.summary_table, + ] + if self.report_table.row_count > 0: + layout = layout + [ + self.report_table, + ] + elif self.command == "verify-policy": + if self.policy_summary_table.row_count > 0: + if self.components_violates_table.row_count > 0: + layout = layout + [ + "[bold red] Components Violates Policy[/]", + self.components_violates_table, + ] + else: + layout = layout + [ + "[bold red] Components Violates Policy[/] [white not italic]None[/]", + ] + if self.components_satisfy_table.row_count > 0: + layout = layout + [ + "", + "[bold green] Components Satisfy Policy[/]", + self.components_satisfy_table, + ] + else: + layout = layout + [ + "", + "[bold green] Components Satisfy Policy[/] [white not italic]None[/]", + ] + layout = layout + ["", self.policy_summary_table] + if self.verification_summary_attestation: + vsa_table = Table(show_header=False, box=None) + vsa_table.add_column("Detail", justify="left") + vsa_table.add_column("Value", justify="left") + + vsa_table.add_row( + "[bold blue]Verification Summary Attestation[/]", + self.verification_summary_attestation, + ) + + layout = layout + [vsa_table] + + if self.verbose: + layout = layout + ["", self.verbose_panel] + return Group(*layout) + + def start(self, command: str) -> None: + """Start the live console display.""" + self.command = command + if not self.live.is_started: + self.live.start() + + def close(self) -> None: + """Stop the live console display.""" + self.live.stop() + + +class AccessHandler: + """A class to manage access to the RichConsoleHandler instance.""" + + def __init__(self) -> None: + self.verbose = False + self.rich_handler = RichConsoleHandler() + + def set_handler(self, verbose: bool) -> RichConsoleHandler: + """Set the verbosity and create a new RichConsoleHandler instance.""" + self.rich_handler = RichConsoleHandler(verbose) + return self.rich_handler + + def get_handler(self) -> RichConsoleHandler: + """Get the current RichConsoleHandler instance.""" + return self.rich_handler + + +access_handler = AccessHandler() diff --git a/src/macaron/output_reporter/reporter.py b/src/macaron/output_reporter/reporter.py index 78464e13d..c676fcec9 100644 --- a/src/macaron/output_reporter/reporter.py +++ b/src/macaron/output_reporter/reporter.py @@ -19,6 +19,7 @@ ) import macaron.output_reporter.jinja2_extensions as jinja2_extensions # pylint: disable=consider-using-from-import +from macaron.console import access_handler from macaron.output_reporter.results import Report from macaron.output_reporter.scm import SCMStatus @@ -42,6 +43,7 @@ def __init__(self, mode: str = "w", encoding: str = "utf-8"): """ self.mode = mode self.encoding = encoding + self.rich_handler = access_handler.get_handler() def write_file(self, file_path: str, data: str) -> bool: """Write the data into a file. @@ -64,7 +66,11 @@ def write_file(self, file_path: str, data: str) -> bool: file.write(data) return True except OSError as error: - logger.error("Cannot write to %s. Error: %s", os.path.relpath(file_path, os.getcwd()), error) + logger.error( + "Cannot write to %s. Error: %s", + os.path.relpath(file_path, os.getcwd()), + error, + ) return False @abc.abstractmethod @@ -115,18 +121,21 @@ def generate(self, target_dir: str, report: Report | dict) -> None: report: Report | dict The report to be generated. """ + self.rich_handler = access_handler.get_handler() if not isinstance(report, Report): return try: dep_file_name = os.path.join(target_dir, "dependencies.json") serialized_configs = list(report.get_serialized_configs()) self.write_file(dep_file_name, json.dumps(serialized_configs, indent=self.indent)) + self.rich_handler.update_report_table("Dependencies Report", os.path.relpath(dep_file_name, os.getcwd())) for record in report.get_records(): if record.context and record.status == SCMStatus.AVAILABLE: file_name = os.path.join(target_dir, f"{record.context.component.report_file_name}.json") json_data = json.dumps(record.get_dict(), indent=self.indent) self.write_file(file_name, json_data) + self.rich_handler.update_report_table("JSON Report", os.path.relpath(file_name, os.getcwd())) except TypeError as error: logger.critical("Cannot serialize output report to JSON: %s", error) @@ -207,6 +216,7 @@ def generate(self, target_dir: str, report: Report | dict) -> None: report: Report | dict The report to be generated. """ + self.rich_handler = access_handler.get_handler() if not self.template or not isinstance(report, Report): return @@ -218,6 +228,7 @@ def generate(self, target_dir: str, report: Report | dict) -> None: # in the original data. html = self.template.render(deepcopy(record.get_dict())) self.write_file(file_name, html) + self.rich_handler.update_report_table("HTML Report", os.path.relpath(file_name, os.getcwd())) except TemplateSyntaxError as error: location = f"line {error.lineno}" name = error.filename or error.name @@ -261,9 +272,16 @@ def generate(self, target_dir: str, report: Report | dict) -> None: report: Report | dict The report to be generated. """ + self.rich_handler = access_handler.get_handler() if not isinstance(report, dict): return try: - self.write_file(os.path.join(target_dir, "policy_report.json"), json.dumps(report, indent=self.indent)) + self.write_file( + os.path.join(target_dir, "policy_report.json"), + json.dumps(report, indent=self.indent), + ) + self.rich_handler.update_policy_report( + os.path.relpath(os.path.join(target_dir, "policy_report.json"), os.getcwd()) + ) except (TypeError, ValueError, OSError) as error: logger.critical("Cannot serialize the policy report to JSON: %s", error) diff --git a/src/macaron/output_reporter/results.py b/src/macaron/output_reporter/results.py index 5bf8c8806..dddd636a3 100644 --- a/src/macaron/output_reporter/results.py +++ b/src/macaron/output_reporter/results.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains classes that represent the result of the Macaron analysis.""" @@ -9,6 +9,7 @@ from typing import Generic, TypedDict, TypeVar from macaron.config.target_config import Configuration +from macaron.console import access_handler from macaron.output_reporter.scm import SCMStatus from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.check_result import CheckResultType @@ -199,6 +200,7 @@ def __init__(self, root_record: Record) -> None: self.record_mapping: dict[str, Record] = {} if root_record.context: self.record_mapping[root_record.record_id] = root_record + self.rich_handler = access_handler.get_handler() def get_records(self) -> Iterable[Record]: """Get the generator for all records in the report. @@ -297,6 +299,7 @@ def __str__(self) -> str: """Return the string representation of the Report instance.""" ctx_list = list(self.get_ctxs()) main_ctx: AnalyzeContext = ctx_list.pop(0) + self.rich_handler = access_handler.get_handler() output = "".join( [ @@ -306,6 +309,7 @@ def __str__(self) -> str: "\nSLSA REQUIREMENT RESULTS:\n", ] ) + self.rich_handler.update_checks_summary(main_ctx.get_check_summary(), len(main_ctx.check_results)) slsa_req_mesg: dict[SLSALevels, list[str]] = {level: [] for level in SLSALevels if level != SLSALevels.LEVEL0} for req_name, req_status in main_ctx.ctx_data.items(): @@ -320,7 +324,12 @@ def __str__(self) -> str: dep_req = dep.ctx_data.get(ReqName(req.name)) if dep_req and not dep_req.is_pass: fail_count += 1 - message = "".join([message, f" (and {fail_count}/{len(ctx_list)} dependencies FAILED)"]) + message = "".join( + [ + message, + f" (and {fail_count}/{len(ctx_list)} dependencies FAILED)", + ] + ) slsa_req_mesg[req.min_level_required].append(message) diff --git a/src/macaron/policy_engine/policy_engine.py b/src/macaron/policy_engine/policy_engine.py index 1b9bec29c..e815d48f4 100644 --- a/src/macaron/policy_engine/policy_engine.py +++ b/src/macaron/policy_engine/policy_engine.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module handles invoking the souffle policy engine on a database.""" @@ -10,6 +10,7 @@ from sqlalchemy import MetaData, create_engine, select from macaron import __version__ as mcn_version +from macaron.console import access_handler from macaron.database.table_definitions import Analysis from macaron.policy_engine.souffle import SouffleError, SouffleWrapper from macaron.policy_engine.souffle_code_generator import ( @@ -58,7 +59,11 @@ def get_generated(database_path: os.PathLike | str) -> SouffleProgram: return prelude -def copy_prelude(database_path: os.PathLike | str, sfl: SouffleWrapper, prelude: SouffleProgram | None = None) -> None: +def copy_prelude( + database_path: os.PathLike | str, + sfl: SouffleWrapper, + prelude: SouffleProgram | None = None, +) -> None: """ Generate and copy the prelude into the souffle instance's include directory. @@ -132,7 +137,10 @@ def _check_version(database_path: str) -> None: ).scalar() if versions is not None: logger.error("Database generated with unsupported versions (%s).", versions) - logger.error("Only databases generated by Macaron version %s are supported.", mcn_version) + logger.error( + "Only databases generated by Macaron version %s are supported.", + mcn_version, + ) sys.exit(os.EX_DATAERR) @@ -176,4 +184,7 @@ def run_policy_engine(database_path: str, policy_content: str) -> dict: logger.info("Policy results:\n%s", "\n".join(output)) + rich_handler = access_handler.get_handler() + rich_handler.update_policy_engine(res) + return res diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index 9017a4ae0..0a89a2a6b 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -42,6 +42,7 @@ from macaron.config.defaults import defaults from macaron.config.global_config import global_config +from macaron.console import access_handler from macaron.errors import CloneError, RepoCheckOutError from macaron.repo_finder import repo_finder_pypi, to_domain_from_known_purl_types from macaron.repo_finder.commit_finder import find_commit, match_tags @@ -142,7 +143,9 @@ def find_repo( def find_repo_alternative( - purl: PackageURL, outcome: RepoFinderInfo, package_registries_info: list[PackageRegistryInfo] | None = None + purl: PackageURL, + outcome: RepoFinderInfo, + package_registries_info: list[PackageRegistryInfo] | None = None, ) -> tuple[str, RepoFinderInfo]: """Use PURL type specific methods to find the repository when the standard methods have failed. @@ -279,7 +282,12 @@ def find_source(purl_string: str, input_repo: str | None, latest_version_fallbac repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR) logging.getLogger("macaron.slsa_analyzer.git_url").disabled = True # The prepare_repo function will also check the latest version of the artifact if required. - git_obj, _ = prepare_repo(repo_dir, found_repo, purl=purl, latest_version_fallback=not checked_latest_purl) + git_obj, _ = prepare_repo( + repo_dir, + found_repo, + purl=purl, + latest_version_fallback=not checked_latest_purl, + ) if git_obj: digest = git_obj.get_head().hash @@ -321,7 +329,12 @@ def find_source(purl_string: str, input_repo: str | None, latest_version_fallbac logger.info("Found commit for PURL: %s", digest) - if not generate_report(purl_string, digest, found_repo, os.path.join(global_config.output_path, "reports")): + if not generate_report( + purl_string, + digest, + found_repo, + os.path.join(global_config.output_path, "reports"), + ): return False return True @@ -381,7 +394,9 @@ def get_latest_repo_if_different(latest_version_purl: PackageURL, original_repo: if check_repo_urls_are_equivalent(original_repo, latest_repo): logger.error( - "Repository from latest PURL is equivalent to original repository: %s ~= %s", latest_repo, original_repo + "Repository from latest PURL is equivalent to original repository: %s ~= %s", + latest_repo, + original_repo, ) return "" @@ -427,6 +442,7 @@ def prepare_repo( tuple[Git | None, CommitFinderInfo] The pydriller.Git object of the repository or None if error, and the outcome of the Commit Finder. """ + rich_handler = access_handler.get_handler() # TODO: separate the logic for handling remote and local repos instead of putting them into this method. logger.info( "Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)", @@ -434,6 +450,7 @@ def prepare_repo( branch_name, digest, ) + rich_handler.add_description_table_content("Remote Path:", repo_path) is_remote = is_remote_repo(repo_path) commit_finder_outcome = CommitFinderInfo.NOT_USED @@ -451,6 +468,7 @@ def prepare_repo( logger.info("Cloning the repository.") try: git_service.clone_repo(resolved_local_path, resolved_remote_path) + rich_handler.add_description_table_content("Local Cloned Path:", repo_unique_path) except CloneError as error: logger.error("Cannot clone %s: %s", resolved_remote_path, str(error)) return None, commit_finder_outcome diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index e013f8411..3718ed955 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -25,7 +25,12 @@ ) from macaron.config.global_config import global_config from macaron.config.target_config import Configuration -from macaron.database.database_manager import DatabaseManager, get_db_manager, get_db_session +from macaron.console import access_handler +from macaron.database.database_manager import ( + DatabaseManager, + get_db_manager, + get_db_session, +) from macaron.database.table_definitions import ( Analysis, Component, @@ -107,11 +112,18 @@ def __init__(self, output_path: str, build_log_path: str) -> None: logger.error("Cannot start the analysis. Exiting ...") sys.exit(1) + excluded_checks = [check for check in registry.get_all_checks_mapping() if check not in registry.checks_to_run] logger.info( "The following checks are excluded based on the user configuration: %s", - [check for check in registry.get_all_checks_mapping() if check not in registry.checks_to_run], + excluded_checks, + ) + self.rich_handler = access_handler.get_handler() + self.rich_handler.add_description_table_content( + "Excluded Checks:", + ", ".join(excluded_checks) if excluded_checks else "None", ) logger.info("The following checks will be run: %s", registry.checks_to_run) + self.rich_handler.add_description_table_content("Final Checks:", "\n".join(registry.checks_to_run)) self.output_path = output_path @@ -217,7 +229,10 @@ def run( ) else: # Can't reach here. - logger.critical("Expecting deps depth to be '0', '1' or '-1', got %s", deps_depth) + logger.critical( + "Expecting deps depth to be '0', '1' or '-1', got %s", + deps_depth, + ) return os.EX_USAGE # Merge the automatically resolved dependencies with the manual configuration. @@ -297,7 +312,9 @@ def generate_reports(self, report: Report) -> None: return output_target_path = os.path.join( - global_config.output_path, "reports", report.root_record.context.component.report_dir_name + global_config.output_path, + "reports", + report.root_record.context.component.report_dir_name, ) os.makedirs(output_target_path, exist_ok=True) @@ -484,6 +501,7 @@ def run_single( logger.info("Analyzing %s", repo_id) logger.info("With PURL: %s", component.purl) logger.info("=====================================") + self.rich_handler.add_description_table_content("Full Name:", component.purl) analyze_ctx = self.create_analyze_ctx(component) analyze_ctx.dynamic_data["expectation"] = self.expectations.get_expectation_for_target( @@ -555,7 +573,10 @@ def run_single( slsa_version = extract_predicate_version(provenance_payload) slsa_level = determine_provenance_slsa_level( - analyze_ctx, provenance_payload, provenance_is_verified, provenance_l3_verified + analyze_ctx, + provenance_payload, + provenance_is_verified, + provenance_l3_verified, ) analyze_ctx.dynamic_data["provenance_info"] = Provenance( @@ -673,6 +694,10 @@ def add_repository(self, branch_name: str | None, git_obj: Git) -> Repository | commit_date_str, ) + self.rich_handler.add_description_table_content("Branch:", res_branch if res_branch else "None") + self.rich_handler.add_description_table_content("Commit Hash:", commit_sha) + self.rich_handler.add_description_table_content("Commit Date:", commit_date_str) + return repository class AnalysisTarget(NamedTuple): @@ -750,7 +775,8 @@ def add_component( is not None ): raise DuplicateCmpError( - f"{analysis_target.repo_path} is already analyzed.", context=existing_record.context + f"{analysis_target.repo_path} is already analyzed.", + context=existing_record.context, ) repository = self.add_repository(analysis_target.branch, git_obj) @@ -973,10 +999,15 @@ def _determine_git_service(self, analyze_ctx: AnalyzeContext) -> BaseGitService: git_service = get_git_service(remote_path) if isinstance(git_service, NoneGitService): - logger.info("Unable to find repository or unsupported git service for %s", analyze_ctx.component.purl) + logger.info( + "Unable to find repository or unsupported git service for %s", + analyze_ctx.component.purl, + ) else: logger.info( - "Detected git service %s for %s.", git_service.name, analyze_ctx.component.repository.complete_name + "Detected git service %s for %s.", + git_service.name, + analyze_ctx.component.repository.complete_name, ) analyze_ctx.dynamic_data["git_service"] = git_service @@ -988,7 +1019,9 @@ def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseG build_tool.load_defaults() if build_tool.purl_type == analyze_ctx.component.type: logger.debug( - "Found %s build tool based on the %s PackageURL.", build_tool.name, analyze_ctx.component.purl + "Found %s build tool based on the %s PackageURL.", + build_tool.name, + analyze_ctx.component.purl, ) analyze_ctx.dynamic_data["build_spec"]["purl_tools"].append(build_tool) @@ -1016,6 +1049,11 @@ def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseG ) else: logger.info("Unable to discover build tools because repository is None.") + else: + self.rich_handler.add_description_table_content( + "Build Tools:", + "\n".join([build_tool.name for build_tool in analyze_ctx.dynamic_data["build_spec"]["tools"]]), + ) def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseGitService) -> None: """Determine the CI services used by the software component.""" @@ -1052,6 +1090,12 @@ def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseG ) ) + if analyze_ctx.dynamic_data["ci_services"]: + self.rich_handler.add_description_table_content( + "CI Services:", + "\n".join([ci_service["service"].name for ci_service in analyze_ctx.dynamic_data["ci_services"]]), + ) + def _populate_package_registry_info(self) -> list[PackageRegistryInfo]: """Add all possible package registries to the analysis context.""" package_registries = [] @@ -1070,7 +1114,9 @@ def _populate_package_registry_info(self) -> list[PackageRegistryInfo]: return package_registries def _determine_package_registries( - self, analyze_ctx: AnalyzeContext, package_registries_info: list[PackageRegistryInfo] + self, + analyze_ctx: AnalyzeContext, + package_registries_info: list[PackageRegistryInfo], ) -> None: """Determine the package registries used by the software component based on its build tools.""" build_tools = ( diff --git a/src/macaron/slsa_analyzer/checks/base_check.py b/src/macaron/slsa_analyzer/checks/base_check.py index b1912018e..b300501a0 100644 --- a/src/macaron/slsa_analyzer/checks/base_check.py +++ b/src/macaron/slsa_analyzer/checks/base_check.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the BaseCheck class to be inherited by other concrete Checks.""" @@ -6,6 +6,7 @@ import logging from abc import abstractmethod +from macaron.console import access_handler from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.check_result import ( CheckInfo, @@ -49,7 +50,9 @@ def __init__( The status for this check when it's skipped based on another check's result. """ self._check_info = CheckInfo( - check_id=check_id, check_description=description, eval_reqs=eval_reqs if eval_reqs else [] + check_id=check_id, + check_description=description, + eval_reqs=eval_reqs if eval_reqs else [], ) if not depends_on: @@ -58,6 +61,7 @@ def __init__( self._depends_on = depends_on self._result_on_skip = result_on_skip + self.rich_handler = access_handler.get_handler() @property def check_info(self) -> CheckInfo: @@ -92,10 +96,13 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) - CheckResult The result of the check. """ + self.rich_handler = access_handler.get_handler() logger.info("----------------------------------") logger.info("BEGIN CHECK: %s", self.check_info.check_id) logger.info("----------------------------------") + self.rich_handler.update_checks(self.check_info.check_id, target.component.purl) + check_result_data: CheckResultData if skipped_info: @@ -129,6 +136,12 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) - justification_str, ) + self.rich_handler.update_checks( + self.check_info.check_id, + target.component.purl, + check_result_data.result_type.value, + ) + return CheckResult(check=self.check_info, result=check_result_data) @abstractmethod diff --git a/src/macaron/slsa_analyzer/registry.py b/src/macaron/slsa_analyzer/registry.py index 3d8b6000f..868f88a4c 100644 --- a/src/macaron/slsa_analyzer/registry.py +++ b/src/macaron/slsa_analyzer/registry.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the Registry class for loading checks.""" @@ -14,6 +14,7 @@ from typing import Any, TypeVar from macaron.config.defaults import defaults +from macaron.console import access_handler from macaron.errors import CheckRegistryError from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.base_check import BaseCheck @@ -51,6 +52,7 @@ def __init__(self) -> None: self.check_tree: CheckTree = {} self.execution_order: list[str] = [] + self.rich_handler = access_handler.get_handler() def register(self, check: BaseCheck) -> None: """Register the check. @@ -76,7 +78,10 @@ def register(self, check: BaseCheck) -> None: else: for parent_relationship in check.depends_on: if not self._add_relationship_entry(check.check_info.check_id, parent_relationship): - logger.error("Cannot load relationships of check %s.", check.check_info.check_id) + logger.error( + "Cannot load relationships of check %s.", + check.check_info.check_id, + ) sys.exit(1) self._all_checks_mapping[check.check_info.check_id] = check @@ -169,7 +174,10 @@ def _validate_check(check: Any) -> bool: if check_file_abs_path: if not (hasattr(check, "result_on_skip") and isinstance(check.result_on_skip, CheckResultType)): - logger.error("The status_on_skipped in the Check at %s is invalid.", str(check.check_info.check_id)) + logger.error( + "The status_on_skipped in the Check at %s is invalid.", + str(check.check_info.check_id), + ) return False if not Registry._validate_check_id_format(check.check_info.check_id): @@ -461,11 +469,18 @@ def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]: results: dict[str, CheckResult] = {} skipped_checks: list[SkippedInfo] = [] + self.rich_handler = access_handler.get_handler() + self.rich_handler.no_of_checks(len(registry.checks_to_run)) + for check_id in self.execution_order: check = all_checks.get(check_id) if not check: - logger.error("Check %s is not defined yet. Please add the implementation for %s.", check_id, check_id) + logger.error( + "Check %s is not defined yet. Please add the implementation for %s.", + check_id, + check_id, + ) results[check_id] = CheckResult( check=CheckInfo( check_id=check_id, @@ -495,7 +510,11 @@ def scan(self, target: AnalyzeContext) -> dict[str, CheckResult]: try: results[check_id] = check.run(target, skipped_info) except Exception as exc: # pylint: disable=broad-exception-caught - logger.error("Exception in check %s: %s. Run in verbose mode to get more information.", check_id, exc) + logger.error( + "Exception in check %s: %s. Run in verbose mode to get more information.", + check_id, + exc, + ) logger.debug(traceback.format_exc()) logger.info("Check %s has failed.", check_id) return results @@ -598,7 +617,10 @@ def _should_skip_check(check: BaseCheck, results: dict[str, CheckResult]) -> Ski f"Check {check.check_info.check_id} is set to {check.result_on_skip.value} " f"because {parent_id} {got_status.value}." ) - skipped_info = SkippedInfo(check_id=check.check_info.check_id, suppress_comment=suppress_comment) + skipped_info = SkippedInfo( + check_id=check.check_info.check_id, + suppress_comment=suppress_comment, + ) return skipped_info return None From 8f01b518b9d7d0b6b06a2ea43db26b8e27d035af Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Wed, 10 Sep 2025 18:22:20 +0530 Subject: [PATCH 2/8] refactor: improve console logging for find-source and dump-defaults Signed-off-by: Demolus13 --- src/macaron/__main__.py | 3 +- src/macaron/config/defaults.py | 7 +++- src/macaron/console.py | 46 +++++++++++++++++++++----- src/macaron/repo_finder/repo_finder.py | 5 +++ src/macaron/repo_finder/repo_utils.py | 4 +++ 5 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index b748afed2..b6db1c9c2 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -217,7 +217,7 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int: rich_handler = access_handler.get_handler() if vsa is not None: vsa_filepath = os.path.join(global_config.output_path, "vsa.intoto.jsonl") - rich_handler.update_vsa(vsa_filepath) + rich_handler.update_vsa(os.path.relpath(vsa_filepath, os.getcwd())) logger.info( "Generating the Verification Summary Attestation (VSA) to %s.", os.path.relpath(vsa_filepath, os.getcwd()), @@ -416,7 +416,6 @@ def main(argv: list[str] | None = None) -> None: ) main_parser.add_argument( - "-dro", "--disable-rich-output", default=False, help="Disable Rich UI output", diff --git a/src/macaron/config/defaults.py b/src/macaron/config/defaults.py index 0ac469604..a5b487c0b 100644 --- a/src/macaron/config/defaults.py +++ b/src/macaron/config/defaults.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module provides functions to manage default values.""" @@ -9,6 +9,8 @@ import pathlib import shutil +from macaron.console import access_handler + logger: logging.Logger = logging.getLogger(__name__) @@ -162,14 +164,17 @@ def create_defaults(output_path: str, cwd_path: str) -> bool: # Since we have only one defaults.ini file and ConfigParser.write does not # preserve the comments, copy the file directly. dest_path = os.path.join(output_path, "defaults.ini") + rich_handler = access_handler.get_handler() try: shutil.copy2(src_path, dest_path) logger.info( "Dumped the default values in %s.", os.path.relpath(os.path.join(output_path, "defaults.ini"), cwd_path), ) + rich_handler.update_dump_defaults(os.path.relpath(dest_path, cwd_path)) return True # We catch OSError to support errors on different platforms. except OSError as error: logger.error("Failed to create %s: %s.", os.path.relpath(dest_path, cwd_path), error) + rich_handler.update_dump_defaults("[bold red]Failed[/]") return False diff --git a/src/macaron/console.py b/src/macaron/console.py index cb31362ab..0fa41810e 100644 --- a/src/macaron/console.py +++ b/src/macaron/console.py @@ -16,8 +16,6 @@ from rich.status import Status from rich.table import Table -from macaron.slsa_analyzer.checks.check_result import CheckResultType - class Check: """Class to represent a check with its status and target.""" @@ -72,6 +70,15 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: "Policy Report": Status("[green]Generating[/]"), } self.verification_summary_attestation: str | None = None + self.find_source_table = Table(show_header=False, box=None) + self.find_source_content: dict[str, str | Status] = { + "Repository PURL:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "JSON Report:": "Not Generated", + } + for key, value in self.find_source_content.items(): + self.find_source_table.add_row(key, value) + self.dump_defaults: str | Status = Status("[green]Generating[/]") self.verbose = verbose self.verbose_panel = Panel( "\n".join(self.logs), @@ -137,7 +144,7 @@ def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None failed_checks_table.add_column("Check ID", justify="left") failed_checks_table.add_column("Description", justify="left") - failed_checks = checks_summary[CheckResultType.FAILED] + failed_checks = checks_summary["FAILED"] for check in failed_checks: failed_checks_table.add_row( "[bold red]FAILED[/]", @@ -153,15 +160,15 @@ def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None summary_table.add_row("Total Checks", str(total_checks), style="white") for check_result_type, checks in checks_summary.items(): - if check_result_type == CheckResultType.PASSED: + if check_result_type == "PASSED": summary_table.add_row("PASSED", str(len(checks)), style="green") - if check_result_type == CheckResultType.FAILED: + if check_result_type == "FAILED": summary_table.add_row("FAILED", str(len(checks)), style="red") - if check_result_type == CheckResultType.SKIPPED: + if check_result_type == "SKIPPED": summary_table.add_row("SKIPPED", str(len(checks)), style="yellow") - if check_result_type == CheckResultType.DISABLED: + if check_result_type == "DISABLED": summary_table.add_row("DISABLED", str(len(checks)), style="bright_blue") - if check_result_type == CheckResultType.UNKNOWN: + if check_result_type == "UNKNOWN": summary_table.add_row("UNKNOWN", str(len(checks)), style="white") self.summary_table = summary_table @@ -236,6 +243,20 @@ def update_policy_engine(self, results: dict) -> None: self.generate_policy_summary_table() + def update_find_source_table(self, key: str, value: str | Status) -> None: + """Add or update a key-value pair in the find source table.""" + self.find_source_content[key] = value + find_source_table = Table(show_header=False, box=None) + find_source_table.add_column("Details", justify="left") + find_source_table.add_column("Value", justify="left") + for field, content in self.find_source_content.items(): + find_source_table.add_row(field, content) + self.find_source_table = find_source_table + + def update_dump_defaults(self, value: str | Status) -> None: + """Update the dump defaults status.""" + self.dump_defaults = value + def make_layout(self) -> Group: """Create the overall layout for the console output.""" layout: list[RenderableType] = [] @@ -303,6 +324,15 @@ def make_layout(self) -> Group: ) layout = layout + [vsa_table] + elif self.command == "find-source": + if self.find_source_table.row_count > 0: + layout = layout + [self.find_source_table] + elif self.command == "dump-defaults": + dump_defaults_table = Table(show_header=False, box=None) + dump_defaults_table.add_column("Detail", justify="left") + dump_defaults_table.add_column("Value", justify="left") + dump_defaults_table.add_row("Dump Defaults", self.dump_defaults) + layout = layout + [dump_defaults_table] if self.verbose: layout = layout + ["", self.verbose_panel] diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index 0a89a2a6b..6f876a05c 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -324,10 +324,15 @@ def find_source(purl_string: str, input_repo: str | None, latest_version_fallbac return find_source(str(purl), latest_repo, False) + rich_handler = access_handler.get_handler() if not input_repo: logger.info("Found repository for PURL: %s", found_repo) + rich_handler.update_find_source_table("Repository PURL:", found_repo) + else: + rich_handler.update_find_source_table("Repository PURL:", input_repo) logger.info("Found commit for PURL: %s", digest) + rich_handler.update_find_source_table("Commit Hash:", digest) if not generate_report( purl_string, diff --git a/src/macaron/repo_finder/repo_utils.py b/src/macaron/repo_finder/repo_utils.py index f246b98a0..507442498 100644 --- a/src/macaron/repo_finder/repo_utils.py +++ b/src/macaron/repo_finder/repo_utils.py @@ -13,6 +13,7 @@ from pydriller import Git from macaron.config.global_config import global_config +from macaron.console import access_handler from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, decode_git_tags, parse_git_tags @@ -88,6 +89,9 @@ def generate_report(purl: str, commit: str, repo: str, target_dir: str) -> bool: logger.info("Report written to: %s", os.path.relpath(fullpath, os.getcwd())) + rich_handler = access_handler.get_handler() + rich_handler.update_find_source_table("JSON Report:", os.path.relpath(fullpath, os.getcwd())) + return True From 2af1bc35273e78b434c3bd38bc31dfe0b6d5eaac Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Mon, 22 Sep 2025 11:13:34 +0530 Subject: [PATCH 3/8] refactor: improve console logging for gen-build-spec command Signed-off-by: Demolus13 --- src/macaron/__main__.py | 2 ++ .../build_spec_generator.py | 3 +++ .../reproducible_central.py | 15 +++++++++++ src/macaron/console.py | 27 ++++++++++++++++--- 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index b6db1c9c2..bbc207022 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -351,6 +351,8 @@ def perform_action(action_args: argparse.Namespace) -> None: find_source(action_args) case "gen-build-spec": + if not action_args.disable_rich_output: + rich_handler.start("gen-build-spec") sys.exit(gen_build_spec(action_args)) case _: diff --git a/src/macaron/build_spec_generator/build_spec_generator.py b/src/macaron/build_spec_generator/build_spec_generator.py index 4262f7e6a..dd1217cd0 100644 --- a/src/macaron/build_spec_generator/build_spec_generator.py +++ b/src/macaron/build_spec_generator/build_spec_generator.py @@ -14,6 +14,7 @@ from macaron.build_spec_generator.build_command_patcher import PatchCommandBuildTool, PatchValueType from macaron.build_spec_generator.reproducible_central.reproducible_central import gen_reproducible_central_build_spec +from macaron.console import access_handler from macaron.path_utils.purl_based_path import get_purl_based_dir logger: logging.Logger = logging.getLogger(__name__) @@ -131,6 +132,8 @@ def gen_build_spec_for_purl( build_spec_format.value, os.path.relpath(build_spec_filepath, os.getcwd()), ) + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec("Build Spec Path:", os.path.relpath(build_spec_filepath, os.getcwd())) try: with open(build_spec_filepath, mode="w", encoding="utf-8") as file: file.write(build_spec_content) diff --git a/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py b/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py index 326eea794..32f7eef31 100644 --- a/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py +++ b/src/macaron/build_spec_generator/reproducible_central/reproducible_central.py @@ -23,6 +23,7 @@ lookup_build_tools_check, lookup_latest_component, ) +from macaron.console import access_handler from macaron.errors import QueryMacaronDatabaseError from macaron.slsa_analyzer.checks.build_tool_check import BuildToolFacts @@ -253,6 +254,11 @@ def get_rc_build_tool_name( BuildToolFacts.__tablename__, [(fact.build_tool_name, fact.language) for fact in build_tool_facts], ) + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec( + "Build Tools:", + "\n".join([f"{fact.build_tool_name} ({fact.language})" for fact in build_tool_facts]), + ) return _get_rc_build_tool_name_from_build_facts(build_tool_facts) @@ -351,6 +357,11 @@ def gen_reproducible_central_build_spec( version = purl.version if group is None or version is None: logger.error("Missing group and/or version for purl %s.", purl.to_string()) + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec("Repository PURL:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Repository URL:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Commit Hash:", "[red]FAILED[/]") + rich_handler.update_gen_build_spec("Build Tools:", "[red]FAILED[/]") return None try: @@ -386,6 +397,10 @@ def gen_reproducible_central_build_spec( latest_component_repository.remote_path, latest_component_repository.commit_sha, ) + rich_handler = access_handler.get_handler() + rich_handler.update_gen_build_spec("Repository PURL:", purl.to_string()) + rich_handler.update_gen_build_spec("Repository URL:", latest_component_repository.remote_path) + rich_handler.update_gen_build_spec("Commit Hash:", latest_component_repository.commit_sha) # Getting the RC build tool name from the build tool check facts. rc_build_tool_name = get_rc_build_tool_name( diff --git a/src/macaron/console.py b/src/macaron/console.py index 0fa41810e..66f4c6471 100644 --- a/src/macaron/console.py +++ b/src/macaron/console.py @@ -79,6 +79,16 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: for key, value in self.find_source_content.items(): self.find_source_table.add_row(key, value) self.dump_defaults: str | Status = Status("[green]Generating[/]") + self.gen_build_spec: dict[str, str | Status] = { + "Repository PURL:": Status("[green]Processing[/]"), + "Repository URL:": Status("[green]Processing[/]"), + "Commit Hash:": Status("[green]Processing[/]"), + "Build Tools:": Status("[green]Processing[/]"), + "Build Spec Path:": "Not Generated", + } + self.gen_build_spec_table = Table(show_header=False, box=None) + for key, value in self.gen_build_spec.items(): + self.gen_build_spec_table.add_row(key, value) self.verbose = verbose self.verbose_panel = Panel( "\n".join(self.logs), @@ -257,6 +267,16 @@ def update_dump_defaults(self, value: str | Status) -> None: """Update the dump defaults status.""" self.dump_defaults = value + def update_gen_build_spec(self, key: str, value: str | Status) -> None: + """Add or update a key-value pair in the generate build spec table.""" + self.gen_build_spec[key] = value + gen_build_spec_table = Table(show_header=False, box=None) + gen_build_spec_table.add_column("Details", justify="left") + gen_build_spec_table.add_column("Value", justify="left") + for field, content in self.gen_build_spec.items(): + gen_build_spec_table.add_row(field, content) + self.gen_build_spec_table = gen_build_spec_table + def make_layout(self) -> Group: """Create the overall layout for the console output.""" layout: list[RenderableType] = [] @@ -333,7 +353,9 @@ def make_layout(self) -> Group: dump_defaults_table.add_column("Value", justify="left") dump_defaults_table.add_row("Dump Defaults", self.dump_defaults) layout = layout + [dump_defaults_table] - + elif self.command == "gen-build-spec": + if self.gen_build_spec_table.row_count > 0: + layout = layout + [self.gen_build_spec_table] if self.verbose: layout = layout + ["", self.verbose_panel] return Group(*layout) @@ -353,12 +375,11 @@ class AccessHandler: """A class to manage access to the RichConsoleHandler instance.""" def __init__(self) -> None: - self.verbose = False self.rich_handler = RichConsoleHandler() def set_handler(self, verbose: bool) -> RichConsoleHandler: """Set the verbosity and create a new RichConsoleHandler instance.""" - self.rich_handler = RichConsoleHandler(verbose) + self.rich_handler = RichConsoleHandler(verbose=verbose) return self.rich_handler def get_handler(self) -> RichConsoleHandler: From 2a65d77f35767876ed5159bc00c586994cb8859b Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Mon, 29 Sep 2025 16:43:14 +0530 Subject: [PATCH 4/8] feat: add appropriate handling for info not found Signed-off-by: Demolus13 --- src/macaron/slsa_analyzer/analyzer.py | 24 +++++++++++++++++-- .../checks/detect_malicious_metadata_check.py | 3 +-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 3718ed955..de9a93cdb 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -695,8 +695,12 @@ def add_repository(self, branch_name: str | None, git_obj: Git) -> Repository | ) self.rich_handler.add_description_table_content("Branch:", res_branch if res_branch else "None") - self.rich_handler.add_description_table_content("Commit Hash:", commit_sha) - self.rich_handler.add_description_table_content("Commit Date:", commit_date_str) + self.rich_handler.add_description_table_content( + "Commit Hash:", commit_sha if commit_sha else "[red]Not Found[/]" + ) + self.rich_handler.add_description_table_content( + "Commit Date:", commit_date_str if commit_date_str else "[red]Not Found[/]" + ) return repository @@ -785,6 +789,9 @@ def add_component( # software component. If this happens, we don't raise error and treat the software component as if it # does not have any ``Repository`` attached to it. repository = None + self.rich_handler.add_description_table_content("Branch:", "[red]Not Found[/]") + self.rich_handler.add_description_table_content("Commit Hash:", "[red]Not Found[/]") + self.rich_handler.add_description_table_content("Commit Date:", "[red]Not Found[/]") if not analysis_target.parsed_purl: # If the PURL is not available. This will only mean that the user don't provide PURL but only provide the @@ -1049,6 +1056,10 @@ def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseG ) else: logger.info("Unable to discover build tools because repository is None.") + self.rich_handler.add_description_table_content( + "Build Tools:", + "[red]Not Found[/]", + ) else: self.rich_handler.add_description_table_content( "Build Tools:", @@ -1058,6 +1069,10 @@ def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseG def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseGitService) -> None: """Determine the CI services used by the software component.""" if isinstance(git_service, NoneGitService): + self.rich_handler.add_description_table_content( + "CI Services:", + "[red]Not Found[/]", + ) return # Determine the CI services. @@ -1095,6 +1110,11 @@ def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseG "CI Services:", "\n".join([ci_service["service"].name for ci_service in analyze_ctx.dynamic_data["ci_services"]]), ) + else: + self.rich_handler.add_description_table_content( + "CI Services:", + "[red]Not Found[/]", + ) def _populate_package_registry_info(self) -> list[PackageRegistryInfo]: """Add all possible package registries to the analysis context.""" diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index fb363dfb0..65cbf2961 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -82,8 +82,7 @@ class DetectMaliciousMetadataCheck(BaseCheck): def __init__(self) -> None: """Initialize a check instance.""" check_id = "mcn_detect_malicious_metadata_1" - description = """Check if the package is malicious. - """ + description = """Check if the package is malicious.""" super().__init__(check_id=check_id, description=description, eval_reqs=[]) def _should_skip( From 840789092e041c7e08a1047ff55102535d8e7e8f Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Mon, 29 Sep 2025 21:33:15 +0530 Subject: [PATCH 5/8] feat: add rich python library for formatted terminal output Signed-off-by: Demolus13 --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 1b7a1fcc6..d0c6c728c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", "email-validator >=2.2.0,<3.0.0", + "rich ~= 13.5", ] keywords = [] # https://pypi.org/classifiers/ From cccf92607ac883eef772723719f4bbf9f3e15766 Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Wed, 1 Oct 2025 14:35:10 +0530 Subject: [PATCH 6/8] docs: update all docstrings in console Signed-off-by: Demolus13 --- src/macaron/console.py | 276 +++++++++++++++--- .../slsa_analyzer/checks/base_check.py | 3 +- .../checks/infer_artifact_pipeline_check.py | 7 +- 3 files changed, 244 insertions(+), 42 deletions(-) diff --git a/src/macaron/console.py b/src/macaron/console.py index 66f4c6471..d27c03f7e 100644 --- a/src/macaron/console.py +++ b/src/macaron/console.py @@ -17,17 +17,26 @@ from rich.table import Table -class Check: - """Class to represent a check with its status and target.""" - - status = "PENDING" - target = "" - - class RichConsoleHandler(RichHandler): """A rich console handler for logging with rich formatting and live updates.""" def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: + """ + Initialize the RichConsoleHandler. + + Parameters + ---------- + verbose : bool, optional + if True, enables verbose logging, by default False + args + Variable length argument list. + kwargs + Arbitrary keyword arguments. + + Returns + ------- + None + """ super().__init__(*args, **kwargs) self.setLevel(logging.DEBUG) self.command = "" @@ -52,7 +61,7 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: ) self.task_id: TaskID self.progress_table = Table(show_header=False, box=None) - self.checks: dict[str, Check] = {} + self.checks: dict[str, str] = {} self.failed_checks_table = Table(show_header=False, box=None) self.summary_table = Table(show_header=False, box=None) self.report_table = Table(show_header=False, box=None) @@ -99,7 +108,18 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: self.live = Live(get_renderable=self.make_layout, refresh_per_second=10) def emit(self, record: logging.LogRecord) -> None: - """Emit a log record with rich formatting.""" + """ + Emit a log record with rich formatting. + + Parameters + ---------- + record : logging.LogRecord + The log record to be emitted. + + Returns + ------- + None + """ log_time = time.strftime("%H:%M:%S") msg = self.format(record) @@ -113,7 +133,20 @@ def emit(self, record: logging.LogRecord) -> None: self.verbose_panel.renderable = "\n".join(self.logs) def add_description_table_content(self, key: str, value: str | Status) -> None: - """Add or update a key-value pair in the description table.""" + """ + Add or update a key-value pair in the description table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + + Returns + ------- + None + """ self.description_table_content[key] = value description_table = Table(show_header=False, box=None) description_table.add_column("Details", justify="left") @@ -124,31 +157,64 @@ def add_description_table_content(self, key: str, value: str | Status) -> None: self.description_table = description_table def no_of_checks(self, value: int) -> None: - """Initialize the progress bar with the total number of checks.""" + """ + Initialize the progress bar with the total number of checks. + + Parameters + ---------- + value : int + The total number of checks to be performed. + + Returns + ------- + None + """ self.task_id = self.progress.add_task("analyzing", total=value) - def update_checks(self, check_id: str, target: str, status: str = "RUNNING") -> None: - """Update the status and target of a specific check.""" - if check_id not in self.checks: - self.checks[check_id] = Check() - self.checks[check_id].status = status - self.checks[check_id].target = target + def update_checks(self, check_id: str, status: str = "RUNNING") -> None: + """ + Update the status of a specific check and refresh the progress table. + + Parameters + ---------- + check_id : str + The identifier of the check to be updated. + status : str, optional + The new status of the check, by default "RUNNING" + + Returns + ------- + None + """ + self.checks[check_id] = status progress_table = Table(show_header=False, box=None) progress_table.add_column("Status", justify="left") progress_table.add_column("Check", justify="left") - progress_table.add_column("Target", justify="left") - for check_name, check in self.checks.items(): - if check.status == "RUNNING": - progress_table.add_row(Status("[bold green]RUNNING[/]"), check_name, check.target) + for check_name, check_status in self.checks.items(): + if check_status == "RUNNING": + progress_table.add_row(Status("[bold green]RUNNING[/]"), check_name) self.progress_table = progress_table if self.task_id is not None and status != "RUNNING": self.progress.update(self.task_id, advance=1) def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None: - """Update the summary tables based on the checks summary.""" + """ + Update the summary tables with the results of the checks. + + Parameters + ---------- + checks_summary : dict + Dictionary containing lists of checks categorized by their results. + total_checks : int + The total number of checks. + + Returns + ------- + None + """ failed_checks_table = Table(show_header=False, box=None) failed_checks_table.add_column("Status", justify="left") failed_checks_table.add_column("Check ID", justify="left") @@ -184,7 +250,20 @@ def update_checks_summary(self, checks_summary: dict, total_checks: int) -> None self.summary_table = summary_table def update_report_table(self, report_type: str, report_path: str) -> None: - """Update the report table with the given report type and path.""" + """ + Update the report table with the path of a generated report. + + Parameters + ---------- + report_type : str + The type of the report (e.g., "HTML Report", "JSON Report"). + report_path : str + The relative path to the generated report. + + Returns + ------- + None + """ self.reports[report_type] = report_path report_table = Table(show_header=False, box=None) report_table.add_column("Report Type", justify="left") @@ -196,7 +275,13 @@ def update_report_table(self, report_type: str, report_path: str) -> None: self.report_table = report_table def generate_policy_summary_table(self) -> None: - """Generate the policy summary table.""" + """ + Generate the policy summary table based on the current policy summary data. + + Returns + ------- + None + """ policy_summary_table = Table(show_header=False, box=None) policy_summary_table.add_column("Detail", justify="left") policy_summary_table.add_column("Value", justify="left") @@ -214,16 +299,50 @@ def generate_policy_summary_table(self) -> None: self.policy_summary_table = policy_summary_table def update_policy_report(self, report_path: str) -> None: - """Update the policy report path in the policy summary.""" + """ + Update the policy report path in the policy summary. + + Parameters + ---------- + report_path : str + The relative path to the policy report. + + Returns + ------- + None + """ self.policy_summary["Policy Report"] = report_path self.generate_policy_summary_table() def update_vsa(self, vsa_path: str) -> None: - """Update the verification summary attestation path.""" + """ + Update the verification summary attestation path. + + Parameters + ---------- + vsa_path : str + The relative path to the verification summary attestation. + + Returns + ------- + None + """ self.verification_summary_attestation = vsa_path def update_policy_engine(self, results: dict) -> None: - """Update the policy engine results.""" + """ + Update the policy engine results including components that violate or satisfy policies. + + Parameters + ---------- + results : dict + Dictionary containing policy engine results including components that violate or satisfy policies, + and lists of passed and failed policies. + + Returns + ------- + None + """ components_violates_table = Table(show_header=False, box=None) components_violates_table.add_column("Assign No.", justify="left") components_violates_table.add_column("Component", justify="left") @@ -254,7 +373,20 @@ def update_policy_engine(self, results: dict) -> None: self.generate_policy_summary_table() def update_find_source_table(self, key: str, value: str | Status) -> None: - """Add or update a key-value pair in the find source table.""" + """ + Add or update a key-value pair in the find source table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + + Returns + ------- + None + """ self.find_source_content[key] = value find_source_table = Table(show_header=False, box=None) find_source_table.add_column("Details", justify="left") @@ -264,11 +396,35 @@ def update_find_source_table(self, key: str, value: str | Status) -> None: self.find_source_table = find_source_table def update_dump_defaults(self, value: str | Status) -> None: - """Update the dump defaults status.""" + """ + Update the dump defaults value. + + Parameters + ---------- + value : str or Status + The value to be set for dump defaults. + + Returns + ------- + None + """ self.dump_defaults = value def update_gen_build_spec(self, key: str, value: str | Status) -> None: - """Add or update a key-value pair in the generate build spec table.""" + """ + Add or update a key-value pair in the generate build spec table. + + Parameters + ---------- + key : str + The key to be added or updated. + value : str or Status + The value associated with the key. + + Returns + ------- + None + """ self.gen_build_spec[key] = value gen_build_spec_table = Table(show_header=False, box=None) gen_build_spec_table.add_column("Details", justify="left") @@ -278,7 +434,14 @@ def update_gen_build_spec(self, key: str, value: str | Status) -> None: self.gen_build_spec_table = gen_build_spec_table def make_layout(self) -> Group: - """Create the overall layout for the console output.""" + """ + Create the layout for the live console display. + + Returns + ------- + Group + A rich Group object containing the layout for the live console display. + """ layout: list[RenderableType] = [] if self.command == "analyze": layout = layout + [Rule(" DESCRIPTION", align="left")] @@ -361,13 +524,30 @@ def make_layout(self) -> Group: return Group(*layout) def start(self, command: str) -> None: - """Start the live console display.""" + """ + Start the live console display. + + Parameters + ---------- + command : str + The command being executed (e.g., "analyze", "verify-policy"). + + Returns + ------- + None + """ self.command = command if not self.live.is_started: self.live.start() def close(self) -> None: - """Stop the live console display.""" + """ + Stop the live console display. + + Returns + ------- + None + """ self.live.stop() @@ -375,15 +555,41 @@ class AccessHandler: """A class to manage access to the RichConsoleHandler instance.""" def __init__(self) -> None: + """ + Initialize the AccessHandler with a default RichConsoleHandler instance. + + Returns + ------- + None + """ self.rich_handler = RichConsoleHandler() def set_handler(self, verbose: bool) -> RichConsoleHandler: - """Set the verbosity and create a new RichConsoleHandler instance.""" + """ + Set a new RichConsoleHandler instance with the specified verbosity. + + Parameters + ---------- + verbose : bool + if True, enables verbose logging + + Returns + ------- + RichConsoleHandler + The new RichConsoleHandler instance. + """ self.rich_handler = RichConsoleHandler(verbose=verbose) return self.rich_handler def get_handler(self) -> RichConsoleHandler: - """Get the current RichConsoleHandler instance.""" + """ + Get the current RichConsoleHandler instance. + + Returns + ------- + RichConsoleHandler + The current RichConsoleHandler instance. + """ return self.rich_handler diff --git a/src/macaron/slsa_analyzer/checks/base_check.py b/src/macaron/slsa_analyzer/checks/base_check.py index b300501a0..53f857828 100644 --- a/src/macaron/slsa_analyzer/checks/base_check.py +++ b/src/macaron/slsa_analyzer/checks/base_check.py @@ -101,7 +101,7 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) - logger.info("BEGIN CHECK: %s", self.check_info.check_id) logger.info("----------------------------------") - self.rich_handler.update_checks(self.check_info.check_id, target.component.purl) + self.rich_handler.update_checks(self.check_info.check_id) check_result_data: CheckResultData @@ -138,7 +138,6 @@ def run(self, target: AnalyzeContext, skipped_info: SkippedInfo | None = None) - self.rich_handler.update_checks( self.check_info.check_id, - target.component.purl, check_result_data.result_type.value, ) diff --git a/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py b/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py index c02fa8380..a33ba4586 100644 --- a/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py +++ b/src/macaron/slsa_analyzer/checks/infer_artifact_pipeline_check.py @@ -89,11 +89,8 @@ class ArtifactPipelineCheck(BaseCheck): def __init__(self) -> None: """Initialize the InferArtifactPipeline instance.""" check_id = "mcn_find_artifact_pipeline_1" - description = """ - Detects pipelines from which an artifact is published. - - When a verifiable provenance is found for an artifact, we use it to obtain the pipeline trigger. - """ + description = """Detects pipelines from which an artifact is published. +When a verifiable provenance is found for an artifact, we use it to obtain the pipeline trigger.""" depends_on: list[tuple[str, CheckResultType]] = [("mcn_build_as_code_1", CheckResultType.PASSED)] eval_reqs: list[ReqName] = [] super().__init__( From 96798cfb89391c46783466d539d50d7562de4f71 Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Wed, 1 Oct 2025 14:41:09 +0530 Subject: [PATCH 7/8] feat: add rich python library for formatted terminal output Signed-off-by: Demolus13 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d0c6c728c..221405a88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", "email-validator >=2.2.0,<3.0.0", - "rich ~= 13.5", + "rich >=13.5.3,<15.0.0", ] keywords = [] # https://pypi.org/classifiers/ From 4743d6d039e8ac4d13dbbdbc2510fcce026d34d4 Mon Sep 17 00:00:00 2001 From: Demolus13 Date: Fri, 3 Oct 2025 12:28:02 +0530 Subject: [PATCH 8/8] feat: add error handling and show local repo path if provided Signed-off-by: Demolus13 --- src/macaron/__main__.py | 8 ++++++-- src/macaron/console.py | 24 ++++++++++++++++++++++++ src/macaron/repo_finder/repo_finder.py | 1 + 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index bbc207022..14232ba64 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -20,7 +20,7 @@ ) from macaron.config.defaults import create_defaults, load_defaults from macaron.config.global_config import global_config -from macaron.console import access_handler +from macaron.console import RichConsoleHandler, access_handler from macaron.errors import ConfigurationError from macaron.output_reporter.reporter import HTMLReporter, JSONReporter, PolicyReporter from macaron.policy_engine.policy_engine import run_policy_engine, show_prelude @@ -630,7 +630,7 @@ def main(argv: list[str] | None = None) -> None: # Set global logging config. We need the stream handler for the initial # output directory checking log messages. st_handler: logging.StreamHandler = logging.StreamHandler(sys.stdout) - rich_handler: logging.Handler = logging.Handler() + rich_handler: RichConsoleHandler = access_handler.set_handler(args.verbose) if args.disable_rich_output: if args.verbose: log_level = logging.DEBUG @@ -710,6 +710,10 @@ def main(argv: list[str] | None = None) -> None: sys.exit(os.EX_NOINPUT) perform_action(args) + except KeyboardInterrupt: + if not args.disable_rich_output: + rich_handler.error("Macaron failed: Interrupted by user") + sys.exit(os.EX_SOFTWARE) finally: if args.disable_rich_output: st_handler.close() diff --git a/src/macaron/console.py b/src/macaron/console.py index d27c03f7e..5286562fb 100644 --- a/src/macaron/console.py +++ b/src/macaron/console.py @@ -105,6 +105,7 @@ def __init__(self, *args: Any, verbose: bool = False, **kwargs: Any) -> None: title_align="left", border_style="blue", ) + self.error_message: str = "" self.live = Live(get_renderable=self.make_layout, refresh_per_second=10) def emit(self, record: logging.LogRecord) -> None: @@ -521,8 +522,31 @@ def make_layout(self) -> Group: layout = layout + [self.gen_build_spec_table] if self.verbose: layout = layout + ["", self.verbose_panel] + if self.error_message: + error_panel = Panel( + self.error_message, + title="Error", + title_align="left", + border_style="red", + ) + layout = layout + ["", error_panel] return Group(*layout) + def error(self, message: str) -> None: + """ + Handle error logging. + + Parameters + ---------- + message : str + The error message to be logged. + + Returns + ------- + None + """ + self.error_message = message + def start(self, command: str) -> None: """ Start the live console display. diff --git a/src/macaron/repo_finder/repo_finder.py b/src/macaron/repo_finder/repo_finder.py index 6f876a05c..f9f1789b0 100644 --- a/src/macaron/repo_finder/repo_finder.py +++ b/src/macaron/repo_finder/repo_finder.py @@ -480,6 +480,7 @@ def prepare_repo( else: logger.info("Checking if the path to repo %s is a local path.", repo_path) resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path) + rich_handler.add_description_table_content("Local Cloned Path:", resolved_local_path) if resolved_local_path: try: