Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"cryptography >=44.0.0,<45.0.0",
"semgrep == 1.113.0",
"email-validator >=2.2.0,<3.0.0",
"rich >=13.5.3,<15.0.0",
]
keywords = []
# https://pypi.org/classifiers/
Expand Down
183 changes: 125 additions & 58 deletions src/macaron/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from macaron.config.defaults import create_defaults, load_defaults
from macaron.config.global_config import global_config
from macaron.console import RichConsoleHandler, access_handler
from macaron.errors import ConfigurationError
from macaron.output_reporter.reporter import HTMLReporter, JSONReporter, PolicyReporter
from macaron.policy_engine.policy_engine import run_policy_engine, show_prelude
Expand Down Expand Up @@ -63,7 +64,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None
if analyzer_single_args.provenance_expectation is not None:
if not os.path.exists(analyzer_single_args.provenance_expectation):
logger.critical(
'The provenance expectation file "%s" does not exist.', analyzer_single_args.provenance_expectation
'The provenance expectation file "%s" does not exist.',
analyzer_single_args.provenance_expectation,
)
sys.exit(os.EX_OSFILE)
global_config.load_expectation_files(analyzer_single_args.provenance_expectation)
Expand All @@ -72,7 +74,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None
if analyzer_single_args.python_venv is not None:
if not os.path.exists(analyzer_single_args.python_venv):
logger.critical(
'The Python virtual environment path "%s" does not exist.', analyzer_single_args.python_venv
'The Python virtual environment path "%s" does not exist.',
analyzer_single_args.python_venv,
)
sys.exit(os.EX_OSFILE)
global_config.load_python_venv(analyzer_single_args.python_venv)
Expand All @@ -95,7 +98,10 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None
else:
user_provided_local_maven_repo = analyzer_single_args.local_maven_repo
if not os.path.isdir(user_provided_local_maven_repo):
logger.error("The user provided local Maven repo at %s is not valid.", user_provided_local_maven_repo)
logger.error(
"The user provided local Maven repo at %s is not valid.",
user_provided_local_maven_repo,
)
sys.exit(os.EX_USAGE)

global_config.local_maven_repo = user_provided_local_maven_repo
Expand All @@ -111,7 +117,8 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None
lstrip_blocks=True,
)
html_reporter = HTMLReporter(
env=custom_jinja_env, target_template=os.path.basename(analyzer_single_args.template_path)
env=custom_jinja_env,
target_template=os.path.basename(analyzer_single_args.template_path),
)
if not html_reporter.template:
logger.error("Exiting because the custom template cannot be found.")
Expand Down Expand Up @@ -207,8 +214,10 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int:

result = run_policy_engine(verify_policy_args.database, policy_content)
vsa = generate_vsa(policy_content=policy_content, policy_result=result)
rich_handler = access_handler.get_handler()
if vsa is not None:
vsa_filepath = os.path.join(global_config.output_path, "vsa.intoto.jsonl")
rich_handler.update_vsa(os.path.relpath(vsa_filepath, os.getcwd()))
logger.info(
"Generating the Verification Summary Attestation (VSA) to %s.",
os.path.relpath(vsa_filepath, os.getcwd()),
Expand All @@ -222,8 +231,12 @@ def verify_policy(verify_policy_args: argparse.Namespace) -> int:
file.write(json.dumps(vsa))
except OSError as err:
logger.error(
"Could not generate the VSA to %s. Error: %s", os.path.relpath(vsa_filepath, os.getcwd()), err
"Could not generate the VSA to %s. Error: %s",
os.path.relpath(vsa_filepath, os.getcwd()),
err,
)
else:
rich_handler.update_vsa("No VSA generated.")

policy_reporter = PolicyReporter()
policy_reporter.generate(global_config.output_path, result)
Expand Down Expand Up @@ -290,16 +303,23 @@ def find_source(find_args: argparse.Namespace) -> int:

def perform_action(action_args: argparse.Namespace) -> None:
"""Perform the indicated action of Macaron."""
rich_handler = access_handler.get_handler()
match action_args.action:
case "dump-defaults":
if not action_args.disable_rich_output:
rich_handler.start("dump-defaults")
# Create the defaults.ini file in the output dir and exit.
create_defaults(action_args.output_dir, os.getcwd())
sys.exit(os.EX_OK)

case "verify-policy":
if not action_args.disable_rich_output:
rich_handler.start("verify-policy")
sys.exit(verify_policy(action_args))

case "analyze":
if not action_args.disable_rich_output:
rich_handler.start("analyze")
if not global_config.gh_token:
logger.error("GitHub access token not set.")
sys.exit(os.EX_USAGE)
Expand All @@ -317,6 +337,8 @@ def perform_action(action_args: argparse.Namespace) -> None:
analyze_slsa_levels_single(action_args)

case "find-source":
if not action_args.disable_rich_output:
rich_handler.start("find-source")
try:
for git_service in GIT_SERVICES:
git_service.load_defaults()
Expand All @@ -329,6 +351,8 @@ def perform_action(action_args: argparse.Namespace) -> None:
find_source(action_args)

case "gen-build-spec":
if not action_args.disable_rich_output:
rich_handler.start("gen-build-spec")
sys.exit(gen_build_spec(action_args))

case _:
Expand Down Expand Up @@ -393,6 +417,13 @@ def main(argv: list[str] | None = None) -> None:
action="store_true",
)

main_parser.add_argument(
"--disable-rich-output",
default=False,
help="Disable Rich UI output",
action="store_true",
)

main_parser.add_argument(
"-o",
"--output-dir",
Expand Down Expand Up @@ -531,7 +562,10 @@ def main(argv: list[str] | None = None) -> None:
)

# Dump the default values.
sub_parser.add_parser(name="dump-defaults", description="Dumps the defaults.ini file to the output directory.")
sub_parser.add_parser(
name="dump-defaults",
description="Dumps the defaults.ini file to the output directory.",
)

# Verify the Datalog policy.
vp_parser = sub_parser.add_parser(name="verify-policy")
Expand Down Expand Up @@ -593,65 +627,98 @@ def main(argv: list[str] | None = None) -> None:
main_parser.print_help()
sys.exit(os.EX_USAGE)

if args.verbose:
log_level = logging.DEBUG
log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] [%(levelname)s] %(message)s"
else:
log_level = logging.INFO
log_format = "%(asctime)s [%(levelname)s] %(message)s"

# Set global logging config. We need the stream handler for the initial
# output directory checking log messages.
st_handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(format=log_format, handlers=[st_handler], force=True, level=log_level)
st_handler: logging.StreamHandler = logging.StreamHandler(sys.stdout)
rich_handler: RichConsoleHandler = access_handler.set_handler(args.verbose)
if args.disable_rich_output:
if args.verbose:
log_level = logging.DEBUG
log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] [%(levelname)s] %(message)s"
else:
log_level = logging.INFO
log_format = "%(asctime)s [%(levelname)s] %(message)s"
st_handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(format=log_format, handlers=[st_handler], force=True, level=log_level)
else:
if args.verbose:
log_level = logging.DEBUG
log_format = "%(asctime)s [%(name)s:%(funcName)s:%(lineno)d] %(message)s"
else:
log_level = logging.INFO
log_format = "%(asctime)s %(message)s"
rich_handler = access_handler.set_handler(args.verbose)
logging.basicConfig(format=log_format, handlers=[rich_handler], force=True, level=log_level)

# Set the output directory.
if not args.output_dir:
logger.error("The output path cannot be empty. Exiting ...")
sys.exit(os.EX_USAGE)
try:
# Set the output directory.
if not args.output_dir:
logger.error("The output path cannot be empty. Exiting ...")
sys.exit(os.EX_USAGE)

if os.path.isfile(args.output_dir):
logger.error("The output directory already exists. Exiting ...")
sys.exit(os.EX_USAGE)
if os.path.isfile(args.output_dir):
logger.error("The output directory already exists. Exiting ...")
sys.exit(os.EX_USAGE)

if os.path.isdir(args.output_dir):
logger.info("Setting the output directory to %s", os.path.relpath(args.output_dir, os.getcwd()))
else:
logger.info("No directory at %s. Creating one ...", os.path.relpath(args.output_dir, os.getcwd()))
os.makedirs(args.output_dir)

# Add file handler to the root logger. Remove stream handler from the
# root logger to prevent dependencies printing logs to stdout.
debug_log_path = os.path.join(args.output_dir, "debug.log")
log_file_handler = logging.FileHandler(debug_log_path, "w")
log_file_handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().removeHandler(st_handler)
logging.getLogger().addHandler(log_file_handler)

# Add StreamHandler to the Macaron logger only.
mcn_logger = logging.getLogger("macaron")
mcn_logger.addHandler(st_handler)

logger.info("The logs will be stored in debug.log")

# Set Macaron's global configuration.
# The path to provenance expectation files will be updated if
# set through analyze sub-command.
global_config.load(
macaron_path=macaron.MACARON_PATH,
output_path=args.output_dir,
build_log_path=os.path.join(args.output_dir, "build_log"),
debug_level=log_level,
local_repos_path=args.local_repos_path,
resources_path=os.path.join(macaron.MACARON_PATH, "resources"),
)
if os.path.isdir(args.output_dir):
logger.info(
"Setting the output directory to %s",
os.path.relpath(args.output_dir, os.getcwd()),
)
else:
logger.info(
"No directory at %s. Creating one ...",
os.path.relpath(args.output_dir, os.getcwd()),
)
os.makedirs(args.output_dir)

# Add file handler to the root logger. Remove stream handler from the
# root logger to prevent dependencies printing logs to stdout.
debug_log_path = os.path.join(args.output_dir, "debug.log")
log_file_handler = logging.FileHandler(debug_log_path, "w")
log_file_handler.setFormatter(logging.Formatter(log_format))
if args.disable_rich_output:
logging.getLogger().removeHandler(st_handler)
else:
logging.getLogger().removeHandler(rich_handler)
logging.getLogger().addHandler(log_file_handler)

# Add StreamHandler to the Macaron logger only.
mcn_logger = logging.getLogger("macaron")
if args.disable_rich_output:
mcn_logger.addHandler(st_handler)
else:
mcn_logger.addHandler(rich_handler)

logger.info("The logs will be stored in debug.log")

# Set Macaron's global configuration.
# The path to provenance expectation files will be updated if
# set through analyze sub-command.
global_config.load(
macaron_path=macaron.MACARON_PATH,
output_path=args.output_dir,
build_log_path=os.path.join(args.output_dir, "build_log"),
debug_level=log_level,
local_repos_path=args.local_repos_path,
resources_path=os.path.join(macaron.MACARON_PATH, "resources"),
)

# Load the default values from defaults.ini files.
if not load_defaults(args.defaults_path):
logger.error("Exiting because the defaults configuration could not be loaded.")
sys.exit(os.EX_NOINPUT)
# Load the default values from defaults.ini files.
if not load_defaults(args.defaults_path):
logger.error("Exiting because the defaults configuration could not be loaded.")
sys.exit(os.EX_NOINPUT)

perform_action(args)
perform_action(args)
except KeyboardInterrupt:
if not args.disable_rich_output:
rich_handler.error("Macaron failed: Interrupted by user")
sys.exit(os.EX_SOFTWARE)
finally:
if args.disable_rich_output:
st_handler.close()
else:
rich_handler.close()


def _get_token_from_dict_or_env(token: str, token_dict: dict[str, str]) -> str:
Expand Down
3 changes: 3 additions & 0 deletions src/macaron/build_spec_generator/build_spec_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from macaron.build_spec_generator.build_command_patcher import PatchCommandBuildTool, PatchValueType
from macaron.build_spec_generator.reproducible_central.reproducible_central import gen_reproducible_central_build_spec
from macaron.console import access_handler
from macaron.path_utils.purl_based_path import get_purl_based_dir

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,6 +132,8 @@ def gen_build_spec_for_purl(
build_spec_format.value,
os.path.relpath(build_spec_filepath, os.getcwd()),
)
rich_handler = access_handler.get_handler()
rich_handler.update_gen_build_spec("Build Spec Path:", os.path.relpath(build_spec_filepath, os.getcwd()))
try:
with open(build_spec_filepath, mode="w", encoding="utf-8") as file:
file.write(build_spec_content)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
lookup_build_tools_check,
lookup_latest_component,
)
from macaron.console import access_handler
from macaron.errors import QueryMacaronDatabaseError
from macaron.slsa_analyzer.checks.build_tool_check import BuildToolFacts

Expand Down Expand Up @@ -253,6 +254,11 @@ def get_rc_build_tool_name(
BuildToolFacts.__tablename__,
[(fact.build_tool_name, fact.language) for fact in build_tool_facts],
)
rich_handler = access_handler.get_handler()
rich_handler.update_gen_build_spec(
"Build Tools:",
"\n".join([f"{fact.build_tool_name} ({fact.language})" for fact in build_tool_facts]),
)

return _get_rc_build_tool_name_from_build_facts(build_tool_facts)

Expand Down Expand Up @@ -351,6 +357,11 @@ def gen_reproducible_central_build_spec(
version = purl.version
if group is None or version is None:
logger.error("Missing group and/or version for purl %s.", purl.to_string())
rich_handler = access_handler.get_handler()
rich_handler.update_gen_build_spec("Repository PURL:", "[red]FAILED[/]")
rich_handler.update_gen_build_spec("Repository URL:", "[red]FAILED[/]")
rich_handler.update_gen_build_spec("Commit Hash:", "[red]FAILED[/]")
rich_handler.update_gen_build_spec("Build Tools:", "[red]FAILED[/]")
return None

try:
Expand Down Expand Up @@ -386,6 +397,10 @@ def gen_reproducible_central_build_spec(
latest_component_repository.remote_path,
latest_component_repository.commit_sha,
)
rich_handler = access_handler.get_handler()
rich_handler.update_gen_build_spec("Repository PURL:", purl.to_string())
rich_handler.update_gen_build_spec("Repository URL:", latest_component_repository.remote_path)
rich_handler.update_gen_build_spec("Commit Hash:", latest_component_repository.commit_sha)

# Getting the RC build tool name from the build tool check facts.
rc_build_tool_name = get_rc_build_tool_name(
Expand Down
7 changes: 6 additions & 1 deletion src/macaron/config/defaults.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module provides functions to manage default values."""
Expand All @@ -9,6 +9,8 @@
import pathlib
import shutil

from macaron.console import access_handler

logger: logging.Logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -162,14 +164,17 @@ def create_defaults(output_path: str, cwd_path: str) -> bool:
# Since we have only one defaults.ini file and ConfigParser.write does not
# preserve the comments, copy the file directly.
dest_path = os.path.join(output_path, "defaults.ini")
rich_handler = access_handler.get_handler()
try:
shutil.copy2(src_path, dest_path)
logger.info(
"Dumped the default values in %s.",
os.path.relpath(os.path.join(output_path, "defaults.ini"), cwd_path),
)
rich_handler.update_dump_defaults(os.path.relpath(dest_path, cwd_path))
return True
# We catch OSError to support errors on different platforms.
except OSError as error:
logger.error("Failed to create %s: %s.", os.path.relpath(dest_path, cwd_path), error)
rich_handler.update_dump_defaults("[bold red]Failed[/]")
return False
Loading
Loading