From 2779aec52e26337c54f58c800b51bc304fd00f27 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 18 Jun 2025 14:03:49 +0200 Subject: [PATCH 01/14] Init project --- capiorun/.gitignore | 2 ++ capiorun/capiorun | 3 +++ 2 files changed, 5 insertions(+) create mode 100644 capiorun/.gitignore create mode 100644 capiorun/capiorun diff --git a/capiorun/.gitignore b/capiorun/.gitignore new file mode 100644 index 000000000..483de3d71 --- /dev/null +++ b/capiorun/.gitignore @@ -0,0 +1,2 @@ +.venv +.idea diff --git a/capiorun/capiorun b/capiorun/capiorun new file mode 100644 index 000000000..b77a6e075 --- /dev/null +++ b/capiorun/capiorun @@ -0,0 +1,3 @@ +#!/usr/bin/python3 + + From 3a826246ecd05d79073068c8e775c04399506887 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 18 Jun 2025 17:32:20 +0200 Subject: [PATCH 02/14] First code --- capiorun/.gitignore | 4 ++ capiorun/capiorun | 82 +++++++++++++++++++++++++++- capiorun/generators/BashGenerator.py | 59 ++++++++++++++++++++ capiorun/generators/__init__.py | 0 capiorun/parser.py | 54 ++++++++++++++++++ capiorun/readme.md | 18 ++++++ capiorun/requirements.txt | 2 + 7 files changed, 218 insertions(+), 1 deletion(-) mode change 100644 => 100755 capiorun/capiorun create mode 100644 capiorun/generators/BashGenerator.py create mode 100644 capiorun/generators/__init__.py create mode 100644 capiorun/parser.py create mode 100644 capiorun/readme.md create mode 100644 capiorun/requirements.txt diff --git a/capiorun/.gitignore b/capiorun/.gitignore index 483de3d71..c6a2850e6 100644 --- a/capiorun/.gitignore +++ b/capiorun/.gitignore @@ -1,2 +1,6 @@ .venv .idea +*.json +*.sh +__pycache__ +pippo \ No newline at end of file diff --git a/capiorun/capiorun b/capiorun/capiorun old mode 100644 new mode 100755 index b77a6e075..754923096 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -1,3 +1,83 @@ -#!/usr/bin/python3 +#!python3 +import subprocess +from parser import parse_json +import argparse +import generators.BashGenerator as bashGenerator +from loguru import logger + +logger.remove() +logger.add( + sink=lambda msg: print(msg, end=''), # or use sys.stdout + format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", + colorize=True +) + +CAPIO_DIR = "" + +parser = argparse.ArgumentParser(prog='capiorun', description='Automate the execution of workflows running with CAPIO', + epilog='Developed by Marco Edoardo Santimaria') + +parser.add_argument("-w", "--workflow", help="JSON file containing the workflow description") +parser.add_argument("--server", default="capio_server", help="CAPIO Server executable path") +parser.add_argument("--server-args", default="", help="CAPIO Server arguments (excluding CAPIO-CL configuration flags)") +parser.add_argument("--capio-cl", default="--no-config", help="CAPIO-CL configuration file path") +parser.add_argument("--libcapio", default="libcapio_posix.so", help="CAPIO intercept library path") +parser.add_argument("--target", "-t", help="Kind of submission files to generate", default="bash") +args = parser.parse_args() + +if not args.workflow: + logger.critical("No workflow provided") + exit(-11) + +workflow = parse_json(args.workflow) +CAPIO_DIR = workflow['capio-dir'] +logger.info(f"CAPIO_DIR={workflow['capio-dir']}") + +steps = workflow['steps'] +logger.info(f"Loaded {len(steps)} steps for the workflow {workflow['workflow-name']}") + +jobs_to_submit = {} +for name, step_options in workflow['steps'].items(): + logger.info(f"Generating step executor for step {name}") + + location = step_options["location"] + + if location not in jobs_to_submit: + jobs_to_submit[location] = [] + + jobs_to_submit[location].append({ + "env": f"CAPIO_DIR={CAPIO_DIR} \\\n" + f"CAPIO_APP_NAME={name} \\\n" + f"CAPIO_WORKFLOW_NAME={workflow['workflow-name']} ", + "runnable": f"{step_options['exec']} {step_options['args']} &", + "stepname": name, + }) + +generator = None + +if "bash" in args.target: + logger.info(f"Using BASH generator") + generator = bashGenerator.BashGenerator(workflow) +elif "slurm" in args.target: + logger.info(f"Using SLURM generator") + +else: + logger.critical("Error: requested generator does not exist") + +if generator is None: + logger.critical("Error: requested generator is not instantiated") + exit(-1) + +master_script_path = generator.generate( + jobs_to_submit, + capio_dir=CAPIO_DIR, + server_path=args.server, + server_args=args.server_args, + server_config_path=args.capio_cl, + intercept_path=args.libcapio +) +logger.success(f"Generated submission scripts. Starting workflow") + +subprocess.run(["bash", master_script_path]) diff --git a/capiorun/generators/BashGenerator.py b/capiorun/generators/BashGenerator.py new file mode 100644 index 000000000..48fb0023c --- /dev/null +++ b/capiorun/generators/BashGenerator.py @@ -0,0 +1,59 @@ +from loguru import logger + +logger.remove() +logger.add( + sink=lambda msg: print(msg, end=''), # or use sys.stdout + format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", + colorize=True +) + + +class BashGenerator: + def __init__(self, workflow): + self.workflow = workflow + logger.info(f"Initializing BashGenerator with workflow {self.workflow['workflow-name']}") + + def _generate_master_script(self, generated_scripts): + waitpids = [] + filename = "/tmp/submit-all.sh" + with open(filename, "w") as f: + f.write("#!/bin/bash\n\n") + for i, script in enumerate(generated_scripts): + f.write(f"bash {script} & PID{i}=$!\n") + waitpids.append(f"$PID{i}") + + f.write(f"\n\nwait {' '.join(waitpids)}\n") + return filename + + def generate(self, + jobs_to_submit, + capio_dir, + server_path, + server_args, + server_config_path, + intercept_path + ): + generated_scripts = [] + + for sub_location, sub_steps in jobs_to_submit.items(): + filename = f"/tmp/{self.workflow['workflow-name']}-{sub_location}.sh" + logger.info(f"Generating bash script {filename}") + generated_scripts.append(filename) + with open(filename, "w") as f: + f.write("#!/bin/bash\n\n") + f.write( + f"CAPIO_DIR={capio_dir} \\\n" + f"{server_path} {server_args} {server_config_path} & SERVERPID=$!\n\n") + waitpids = [] + for step in sub_steps: + logger.info(f"Generating command for step {step['stepname']}") + f.write( + f"{step['env']} \\\nLD_PRELOAD={intercept_path} \\\n{step['runnable']} PID{step['stepname']}=$!\n\n") + waitpids.append(f"$PID{step['stepname']}") + f.write(f"\nwait {' '.join(waitpids)}\n") + f.write("killall $SERVERPID\n") + + master_script_path = self._generate_master_script(generated_scripts) + logger.info("Generated master script") + + return master_script_path diff --git a/capiorun/generators/__init__.py b/capiorun/generators/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/capiorun/parser.py b/capiorun/parser.py new file mode 100644 index 000000000..2a8467f0a --- /dev/null +++ b/capiorun/parser.py @@ -0,0 +1,54 @@ + +from loguru import logger +import json +from pathlib import Path + +from jsonschema import validate, ValidationError, SchemaError + +logger.remove() +logger.add( + sink=lambda msg: print(msg, end=''), # or use sys.stdout + format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", + colorize=True +) + + +def validate_json(json_file_path, schema_file_path): + try: + # Load the JSON data + with open(json_file_path, 'r') as json_file: + data = json.load(json_file) + + # Load the JSON schema + with open(schema_file_path, 'r') as schema_file: + schema = json.load(schema_file) + + # Validate the data against the schema + validate(instance=data, schema=schema) + return True + + except ValidationError as ve: + logger.critical(f"Validation error: {ve.message}") + except SchemaError as se: + logger.critical(f"Schema error: {se.message}") + except json.JSONDecodeError as je: + logger.critical(f"JSON decode error: {je}") + except Exception as e: + logger.critical(f"An unexpected error occurred: {e}") + + return False + + +def parse_json(workflow): + + logger.info(f"Parsing workflow {workflow}") + + workflow = json.load(open(workflow)) + current_file_path = Path(__file__).resolve() + schema_file_path = current_file_path.parent.parent / "schema/capio-cl.json" + + if not validate_json(workflow, schema_file_path): + logger.critical("Invalid JSON") + #exit(-1) + + return workflow diff --git a/capiorun/readme.md b/capiorun/readme.md new file mode 100644 index 000000000..aeb62cbfc --- /dev/null +++ b/capiorun/readme.md @@ -0,0 +1,18 @@ +# CAPIORUN: autmated submission of CAPIO workflows +To execute workflows, capiorun requires a json file containing the description +of the workflow steps to execute. + +```json +{ + "capiocl-file" : "capio_cl config path name", + "capio-dir" : "dir", + "steps" : { + "workflow_step_name": { + "exec": "bin path", + "args": "args", + "location": "logic location. used to tell which steps should go in the same submission script" + }, + ... + } +} +``` \ No newline at end of file diff --git a/capiorun/requirements.txt b/capiorun/requirements.txt new file mode 100644 index 000000000..bb0d08a0f --- /dev/null +++ b/capiorun/requirements.txt @@ -0,0 +1,2 @@ +loguru==0.7.3 +jsonschema==4.24.0 \ No newline at end of file From 4b04aeaf1528a8169a8509c8c7958921186ea5bf Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 18 Jun 2025 17:40:20 +0200 Subject: [PATCH 03/14] Improved code organization --- capiorun/Importer.py | 36 +++++++++++++++++ capiorun/capiorun | 96 +++++++++++++++++++------------------------- 2 files changed, 77 insertions(+), 55 deletions(-) create mode 100644 capiorun/Importer.py diff --git a/capiorun/Importer.py b/capiorun/Importer.py new file mode 100644 index 000000000..f53d5ceb9 --- /dev/null +++ b/capiorun/Importer.py @@ -0,0 +1,36 @@ +from loguru import logger + +logger.remove() +logger.add( + sink=lambda msg: print(msg, end=''), # or use sys.stdout + format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", + colorize=True +) + + +class Importer: + def __init__(self, steps, capio_dir, workflow_name): + self.steps = steps + self.capio_dir = capio_dir + self.workflow_name = workflow_name + logger.info(f"Loaded {len(self.steps)} steps for the workflow {self.workflow_name}") + + def import_steps(self): + jobs_to_submit = {} + for name, step_options in self.steps.items(): + logger.info(f"Generating step executor for step {name}") + + location = step_options["location"] + + if location not in jobs_to_submit: + jobs_to_submit[location] = [] + + jobs_to_submit[location].append({ + "env": f"CAPIO_DIR={self.capio_dir} \\\n" + f"CAPIO_APP_NAME={name} \\\n" + f"CAPIO_WORKFLOW_NAME={self.workflow_name} ", + "runnable": f"{step_options['exec']} {step_options['args']} &", + "stepname": name, + }) + + return jobs_to_submit diff --git a/capiorun/capiorun b/capiorun/capiorun index 754923096..7cc86a1a6 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -1,6 +1,7 @@ #!python3 import subprocess +from Importer import Importer from parser import parse_json import argparse import generators.BashGenerator as bashGenerator @@ -25,59 +26,44 @@ parser.add_argument("--server-args", default="", help="CAPIO Server arguments (e parser.add_argument("--capio-cl", default="--no-config", help="CAPIO-CL configuration file path") parser.add_argument("--libcapio", default="libcapio_posix.so", help="CAPIO intercept library path") parser.add_argument("--target", "-t", help="Kind of submission files to generate", default="bash") -args = parser.parse_args() - -if not args.workflow: - logger.critical("No workflow provided") - exit(-11) - -workflow = parse_json(args.workflow) -CAPIO_DIR = workflow['capio-dir'] -logger.info(f"CAPIO_DIR={workflow['capio-dir']}") - -steps = workflow['steps'] -logger.info(f"Loaded {len(steps)} steps for the workflow {workflow['workflow-name']}") - -jobs_to_submit = {} -for name, step_options in workflow['steps'].items(): - logger.info(f"Generating step executor for step {name}") - - location = step_options["location"] - - if location not in jobs_to_submit: - jobs_to_submit[location] = [] - - jobs_to_submit[location].append({ - "env": f"CAPIO_DIR={CAPIO_DIR} \\\n" - f"CAPIO_APP_NAME={name} \\\n" - f"CAPIO_WORKFLOW_NAME={workflow['workflow-name']} ", - "runnable": f"{step_options['exec']} {step_options['args']} &", - "stepname": name, - }) - -generator = None - -if "bash" in args.target: - logger.info(f"Using BASH generator") - generator = bashGenerator.BashGenerator(workflow) -elif "slurm" in args.target: - logger.info(f"Using SLURM generator") - -else: - logger.critical("Error: requested generator does not exist") - -if generator is None: - logger.critical("Error: requested generator is not instantiated") - exit(-1) - -master_script_path = generator.generate( - jobs_to_submit, - capio_dir=CAPIO_DIR, - server_path=args.server, - server_args=args.server_args, - server_config_path=args.capio_cl, - intercept_path=args.libcapio -) -logger.success(f"Generated submission scripts. Starting workflow") -subprocess.run(["bash", master_script_path]) +if __name__ == "__main__": + + args = parser.parse_args() + if not args.workflow: + logger.critical("No workflow provided") + exit(-11) + + workflow = parse_json(args.workflow) + CAPIO_DIR = workflow['capio-dir'] + logger.info(f"CAPIO_DIR={workflow['capio-dir']}") + + importer = Importer(steps = workflow['steps'], capio_dir=CAPIO_DIR, workflow_name=workflow['workflow-name']) + + jobs_to_submit = importer.import_steps() + + generator = None + + if "bash" in args.target: + logger.info(f"Using BASH generator") + generator = bashGenerator.BashGenerator(workflow) + elif "slurm" in args.target: + logger.info(f"Using SLURM generator") + else: + logger.critical("Error: requested generator does not exist") + + if generator is None: + logger.critical("Error: requested generator is not instantiated") + exit(-1) + + master_script_path = generator.generate( + jobs_to_submit, + capio_dir=CAPIO_DIR, + server_path=args.server, + server_args=args.server_args, + server_config_path=args.capio_cl, + intercept_path=args.libcapio + ) + logger.success(f"Generated submission scripts. Starting workflow") + + subprocess.run(["bash", master_script_path]) From bfed467b9a1335b7837a68163b67f12c8d2d4268 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 09:21:30 +0200 Subject: [PATCH 04/14] fix --- capiorun/capiorun | 18 +++++++++++------- capiorun/readme.md | 1 + 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/capiorun/capiorun b/capiorun/capiorun index 7cc86a1a6..816c6a307 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -15,17 +15,19 @@ logger.add( colorize=True ) -CAPIO_DIR = "" - parser = argparse.ArgumentParser(prog='capiorun', description='Automate the execution of workflows running with CAPIO', epilog='Developed by Marco Edoardo Santimaria') parser.add_argument("-w", "--workflow", help="JSON file containing the workflow description") parser.add_argument("--server", default="capio_server", help="CAPIO Server executable path") parser.add_argument("--server-args", default="", help="CAPIO Server arguments (excluding CAPIO-CL configuration flags)") -parser.add_argument("--capio-cl", default="--no-config", help="CAPIO-CL configuration file path") +parser.add_argument("--capio-cl", default="", help="CAPIO-CL configuration file path") parser.add_argument("--libcapio", default="libcapio_posix.so", help="CAPIO intercept library path") parser.add_argument("--target", "-t", help="Kind of submission files to generate", default="bash") +parser.add_argument("--no-execute", + action="store_false", + default=True, + help="Generate only workflow scripts, but do not start it") if __name__ == "__main__": @@ -38,7 +40,7 @@ if __name__ == "__main__": CAPIO_DIR = workflow['capio-dir'] logger.info(f"CAPIO_DIR={workflow['capio-dir']}") - importer = Importer(steps = workflow['steps'], capio_dir=CAPIO_DIR, workflow_name=workflow['workflow-name']) + importer = Importer(steps=workflow['steps'], capio_dir=CAPIO_DIR, workflow_name=workflow['workflow-name']) jobs_to_submit = importer.import_steps() @@ -61,9 +63,11 @@ if __name__ == "__main__": capio_dir=CAPIO_DIR, server_path=args.server, server_args=args.server_args, - server_config_path=args.capio_cl, + server_config_path=("-c " + args.capio_cl if args.capio_cl else "--no-config"), intercept_path=args.libcapio ) - logger.success(f"Generated submission scripts. Starting workflow") + logger.success(f"Generated submission scripts") - subprocess.run(["bash", master_script_path]) + if args.no_execute: + logger.info("Starting workflow execution") + subprocess.run(["bash", master_script_path]) diff --git a/capiorun/readme.md b/capiorun/readme.md index aeb62cbfc..a21a85859 100644 --- a/capiorun/readme.md +++ b/capiorun/readme.md @@ -6,6 +6,7 @@ of the workflow steps to execute. { "capiocl-file" : "capio_cl config path name", "capio-dir" : "dir", + "workflow-name" : "name of the workflow", "steps" : { "workflow_step_name": { "exec": "bin path", From dc64c3eda057fcd49d6997d62d8bc78d912ca945 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 09:25:53 +0200 Subject: [PATCH 05/14] fix --- capiorun/generators/BashGenerator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capiorun/generators/BashGenerator.py b/capiorun/generators/BashGenerator.py index 48fb0023c..5bd9341de 100644 --- a/capiorun/generators/BashGenerator.py +++ b/capiorun/generators/BashGenerator.py @@ -51,7 +51,7 @@ def generate(self, f"{step['env']} \\\nLD_PRELOAD={intercept_path} \\\n{step['runnable']} PID{step['stepname']}=$!\n\n") waitpids.append(f"$PID{step['stepname']}") f.write(f"\nwait {' '.join(waitpids)}\n") - f.write("killall $SERVERPID\n") + f.write("kill $SERVERPID\n") master_script_path = self._generate_master_script(generated_scripts) logger.info("Generated master script") From 69e3c43a847cc2f5aecaef710694beb8229b9620 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 09:42:26 +0200 Subject: [PATCH 06/14] fix --- capiorun/Importer.py | 2 +- capiorun/capiorun | 5 +---- capiorun/generators/BashGenerator.py | 13 +++++++++++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/capiorun/Importer.py b/capiorun/Importer.py index f53d5ceb9..2773e1f89 100644 --- a/capiorun/Importer.py +++ b/capiorun/Importer.py @@ -29,7 +29,7 @@ def import_steps(self): "env": f"CAPIO_DIR={self.capio_dir} \\\n" f"CAPIO_APP_NAME={name} \\\n" f"CAPIO_WORKFLOW_NAME={self.workflow_name} ", - "runnable": f"{step_options['exec']} {step_options['args']} &", + "runnable": f"{step_options['command']} &", "stepname": name, }) diff --git a/capiorun/capiorun b/capiorun/capiorun index 816c6a307..f82ac7972 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -1,6 +1,4 @@ #!python3 -import subprocess - from Importer import Importer from parser import parse_json import argparse @@ -69,5 +67,4 @@ if __name__ == "__main__": logger.success(f"Generated submission scripts") if args.no_execute: - logger.info("Starting workflow execution") - subprocess.run(["bash", master_script_path]) + generator.run() diff --git a/capiorun/generators/BashGenerator.py b/capiorun/generators/BashGenerator.py index 5bd9341de..64f60bfff 100644 --- a/capiorun/generators/BashGenerator.py +++ b/capiorun/generators/BashGenerator.py @@ -1,4 +1,5 @@ from loguru import logger +import subprocess logger.remove() logger.add( @@ -11,6 +12,7 @@ class BashGenerator: def __init__(self, workflow): self.workflow = workflow + self.master_script_path = None logger.info(f"Initializing BashGenerator with workflow {self.workflow['workflow-name']}") def _generate_master_script(self, generated_scripts): @@ -53,7 +55,14 @@ def generate(self, f.write(f"\nwait {' '.join(waitpids)}\n") f.write("kill $SERVERPID\n") - master_script_path = self._generate_master_script(generated_scripts) + self.master_script_path = self._generate_master_script(generated_scripts) logger.info("Generated master script") - return master_script_path + + + def run(self): + logger.info("Starting workflow execution") + if self.master_script_path is None: + logger.critical("No master script found") + exit(1) + subprocess.run(["bash", self.master_script_path]) From f8d946c4a842d80d26838029e3038d7c2bb1bee7 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 09:52:14 +0200 Subject: [PATCH 07/14] fix --- capiorun/capiorun | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/capiorun/capiorun b/capiorun/capiorun index f82ac7972..4c43fb68b 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -16,10 +16,11 @@ logger.add( parser = argparse.ArgumentParser(prog='capiorun', description='Automate the execution of workflows running with CAPIO', epilog='Developed by Marco Edoardo Santimaria') -parser.add_argument("-w", "--workflow", help="JSON file containing the workflow description") +parser.add_argument("-w", "--workflow", default="capiorun.json", help="JSON file containing the workflow description") +parser.add_argument("--capio-cl", default="capiorun-cl.json", help="CAPIO-CL configuration file path") +parser.add_argument("--no-capio-cl", default=False, action="store_true", help="CAPIO-CL configuration file path") parser.add_argument("--server", default="capio_server", help="CAPIO Server executable path") parser.add_argument("--server-args", default="", help="CAPIO Server arguments (excluding CAPIO-CL configuration flags)") -parser.add_argument("--capio-cl", default="", help="CAPIO-CL configuration file path") parser.add_argument("--libcapio", default="libcapio_posix.so", help="CAPIO intercept library path") parser.add_argument("--target", "-t", help="Kind of submission files to generate", default="bash") parser.add_argument("--no-execute", @@ -61,7 +62,7 @@ if __name__ == "__main__": capio_dir=CAPIO_DIR, server_path=args.server, server_args=args.server_args, - server_config_path=("-c " + args.capio_cl if args.capio_cl else "--no-config"), + server_config_path=("-c " + args.capio_cl), intercept_path=args.libcapio ) logger.success(f"Generated submission scripts") From 75122f92f4510c63c61a12c44358b7e5358f2000 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 14:28:18 +0200 Subject: [PATCH 08/14] Fixed capiorun --- capiorun/Importer.py | 36 --------- capiorun/capiorun | 115 +++++++++++++++------------ capiorun/generators/BashGenerator.py | 68 ---------------- capiorun/generators/__init__.py | 0 capiorun/parser.py | 54 ------------- capiorun/readme.md | 26 ++---- capiorun/requirements.txt | 3 +- 7 files changed, 74 insertions(+), 228 deletions(-) delete mode 100644 capiorun/Importer.py delete mode 100644 capiorun/generators/BashGenerator.py delete mode 100644 capiorun/generators/__init__.py delete mode 100644 capiorun/parser.py diff --git a/capiorun/Importer.py b/capiorun/Importer.py deleted file mode 100644 index 2773e1f89..000000000 --- a/capiorun/Importer.py +++ /dev/null @@ -1,36 +0,0 @@ -from loguru import logger - -logger.remove() -logger.add( - sink=lambda msg: print(msg, end=''), # or use sys.stdout - format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", - colorize=True -) - - -class Importer: - def __init__(self, steps, capio_dir, workflow_name): - self.steps = steps - self.capio_dir = capio_dir - self.workflow_name = workflow_name - logger.info(f"Loaded {len(self.steps)} steps for the workflow {self.workflow_name}") - - def import_steps(self): - jobs_to_submit = {} - for name, step_options in self.steps.items(): - logger.info(f"Generating step executor for step {name}") - - location = step_options["location"] - - if location not in jobs_to_submit: - jobs_to_submit[location] = [] - - jobs_to_submit[location].append({ - "env": f"CAPIO_DIR={self.capio_dir} \\\n" - f"CAPIO_APP_NAME={name} \\\n" - f"CAPIO_WORKFLOW_NAME={self.workflow_name} ", - "runnable": f"{step_options['command']} &", - "stepname": name, - }) - - return jobs_to_submit diff --git a/capiorun/capiorun b/capiorun/capiorun index 4c43fb68b..64b77f9ee 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -1,71 +1,86 @@ -#!python3 -from Importer import Importer -from parser import parse_json -import argparse -import generators.BashGenerator as bashGenerator +#!/usr/bin/env python3 + +import os, subprocess, argparse, time, sys +import signal from loguru import logger logger.remove() logger.add( sink=lambda msg: print(msg, end=''), # or use sys.stdout - format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", + format="{time:DD/MM/YYYY HH:mm:ss} | capiorun | {level: <8} | {message}", colorize=True ) -parser = argparse.ArgumentParser(prog='capiorun', description='Automate the execution of workflows running with CAPIO', +parser = argparse.ArgumentParser(prog='capiorun', description='Simplify the execution of workflows running with CAPIO', epilog='Developed by Marco Edoardo Santimaria') -parser.add_argument("-w", "--workflow", default="capiorun.json", help="JSON file containing the workflow description") -parser.add_argument("--capio-cl", default="capiorun-cl.json", help="CAPIO-CL configuration file path") -parser.add_argument("--no-capio-cl", default=False, action="store_true", help="CAPIO-CL configuration file path") -parser.add_argument("--server", default="capio_server", help="CAPIO Server executable path") -parser.add_argument("--server-args", default="", help="CAPIO Server arguments (excluding CAPIO-CL configuration flags)") -parser.add_argument("--libcapio", default="libcapio_posix.so", help="CAPIO intercept library path") -parser.add_argument("--target", "-t", help="Kind of submission files to generate", default="bash") -parser.add_argument("--no-execute", - action="store_false", - default=True, - help="Generate only workflow scripts, but do not start it") +parser.add_argument("-d", "--capio-dir", required=True, help="CAPIO virtual mount point") +parser.add_argument("-w", "--workflow-name", default="CAPIO", help="Name of the workflow") +parser.add_argument("-n", "--app-name", required=True, help="CAPIO Application step name") +parser.add_argument("-L", "--log-level", default="-1", help="CAPIO log level when running in debug mode") +parser.add_argument("-l", "--libcapio", default="libcapio_posix.so", help="Absolute path to libcapio_poix.so") +parser.add_argument("-s", "--server", default="capio_server", help="Absolute path to capio_server") +parser.add_argument("-c", "--capiocl", default="--no-config", help="Absolute path to the CAPIO-CL configuration file") +parser.add_argument('args', nargs=argparse.REMAINDER, help="Command to launch with capio") -if __name__ == "__main__": +server_process = None +step_process = None +if __name__ == "__main__": args = parser.parse_args() - if not args.workflow: - logger.critical("No workflow provided") - exit(-11) - workflow = parse_json(args.workflow) - CAPIO_DIR = workflow['capio-dir'] - logger.info(f"CAPIO_DIR={workflow['capio-dir']}") + if not os.path.exists(f"/dev/shm/{args.workflow_name}"): + logger.info(f"Starting capio server with config file: {args.capiocl}") + logger.info(f"CAPIO_LOG_LEVEL = {args.log_level}") + logger.info(f"CAPIO_WORKFLOW_NAME = {args.workflow_name}") + logger.info(f"CAPIO_APP_NAME = {args.app_name}") + logger.info(f"CAPIO_DIR = {args.capio_dir}") + logger.info(f"CAPIO-CL CONFIG = {args.capiocl}") + if not os.path.exists(args.capiocl): + logger.critical(f"File {args.capiocl} does not exists. aborting execution...") + exit(1) + server_env = os.environ.copy() + server_env["CAPIO_DIR"] = args.capio_dir + server_env["CAPIO_LOG_LEVEL"] = args.log_level + server_process = subprocess.Popen( + [args.server, ("--config " + args.capiocl) if args.capiocl != "--no-config" else args.capiocl], + env=server_env, + stdout=sys.stdout, stderr=sys.stderr) - importer = Importer(steps=workflow['steps'], capio_dir=CAPIO_DIR, workflow_name=workflow['workflow-name']) + logger.debug(f"capio_server PID: {server_process.pid}") + time.sleep(1) - jobs_to_submit = importer.import_steps() - - generator = None - - if "bash" in args.target: - logger.info(f"Using BASH generator") - generator = bashGenerator.BashGenerator(workflow) - elif "slurm" in args.target: - logger.info(f"Using SLURM generator") else: - logger.critical("Error: requested generator does not exist") + logger.debug(f"An instance of capio_server with workflow name {args.workflow_name} already exists!") + + step_env = os.environ.copy() + step_env["CAPIO_WORKFLOW_NAME"] = args.workflow_name + step_env["CAPIO_APP_NAME"] = args.app_name + step_env["CAPIO_DIR"] = args.capio_dir + step_env["CAPIO_LOG_LEVEL"] = args.log_level + step_env["LD_PRELOAD"] = args.libcapio - if generator is None: - logger.critical("Error: requested generator is not instantiated") - exit(-1) + logger.info(f"Starting workflow steps with following environment variables:") + logger.info(f"CAPIO_LOG_LEVEL = {args.log_level}") + logger.info(f"CAPIO_WORKFLOW_NAME = {args.workflow_name}") + logger.info(f"CAPIO_APP_NAME = {args.app_name}") + logger.info(f"CAPIO_DIR = {args.capio_dir}") + logger.info(f"LD_PRELOAD = {args.libcapio}") + logger.info(f"command = {" ".join(args.args)}") + try: + step_process = subprocess.Popen( + args.args, + env=step_env, + stdout=sys.stdout, stderr=sys.stderr + ) - master_script_path = generator.generate( - jobs_to_submit, - capio_dir=CAPIO_DIR, - server_path=args.server, - server_args=args.server_args, - server_config_path=("-c " + args.capio_cl), - intercept_path=args.libcapio - ) - logger.success(f"Generated submission scripts") + step_process.wait() + except Exception as e: + logger.critical("An error occurred in startup of workflow app: {}".format(e)) - if args.no_execute: - generator.run() + if server_process is not None: + logger.info(f"Terminating instance of capio_server") + server_process.send_signal(signal.SIGTERM) + time.sleep(2) + logger.success("Terminated CAPIO server instance") diff --git a/capiorun/generators/BashGenerator.py b/capiorun/generators/BashGenerator.py deleted file mode 100644 index 64f60bfff..000000000 --- a/capiorun/generators/BashGenerator.py +++ /dev/null @@ -1,68 +0,0 @@ -from loguru import logger -import subprocess - -logger.remove() -logger.add( - sink=lambda msg: print(msg, end=''), # or use sys.stdout - format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", - colorize=True -) - - -class BashGenerator: - def __init__(self, workflow): - self.workflow = workflow - self.master_script_path = None - logger.info(f"Initializing BashGenerator with workflow {self.workflow['workflow-name']}") - - def _generate_master_script(self, generated_scripts): - waitpids = [] - filename = "/tmp/submit-all.sh" - with open(filename, "w") as f: - f.write("#!/bin/bash\n\n") - for i, script in enumerate(generated_scripts): - f.write(f"bash {script} & PID{i}=$!\n") - waitpids.append(f"$PID{i}") - - f.write(f"\n\nwait {' '.join(waitpids)}\n") - return filename - - def generate(self, - jobs_to_submit, - capio_dir, - server_path, - server_args, - server_config_path, - intercept_path - ): - generated_scripts = [] - - for sub_location, sub_steps in jobs_to_submit.items(): - filename = f"/tmp/{self.workflow['workflow-name']}-{sub_location}.sh" - logger.info(f"Generating bash script {filename}") - generated_scripts.append(filename) - with open(filename, "w") as f: - f.write("#!/bin/bash\n\n") - f.write( - f"CAPIO_DIR={capio_dir} \\\n" - f"{server_path} {server_args} {server_config_path} & SERVERPID=$!\n\n") - waitpids = [] - for step in sub_steps: - logger.info(f"Generating command for step {step['stepname']}") - f.write( - f"{step['env']} \\\nLD_PRELOAD={intercept_path} \\\n{step['runnable']} PID{step['stepname']}=$!\n\n") - waitpids.append(f"$PID{step['stepname']}") - f.write(f"\nwait {' '.join(waitpids)}\n") - f.write("kill $SERVERPID\n") - - self.master_script_path = self._generate_master_script(generated_scripts) - logger.info("Generated master script") - - - - def run(self): - logger.info("Starting workflow execution") - if self.master_script_path is None: - logger.critical("No master script found") - exit(1) - subprocess.run(["bash", self.master_script_path]) diff --git a/capiorun/generators/__init__.py b/capiorun/generators/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/capiorun/parser.py b/capiorun/parser.py deleted file mode 100644 index 2a8467f0a..000000000 --- a/capiorun/parser.py +++ /dev/null @@ -1,54 +0,0 @@ - -from loguru import logger -import json -from pathlib import Path - -from jsonschema import validate, ValidationError, SchemaError - -logger.remove() -logger.add( - sink=lambda msg: print(msg, end=''), # or use sys.stdout - format="{time:DD/MM/YYYY HH:mm:ss} | {name} | {level: <8} | {message}", - colorize=True -) - - -def validate_json(json_file_path, schema_file_path): - try: - # Load the JSON data - with open(json_file_path, 'r') as json_file: - data = json.load(json_file) - - # Load the JSON schema - with open(schema_file_path, 'r') as schema_file: - schema = json.load(schema_file) - - # Validate the data against the schema - validate(instance=data, schema=schema) - return True - - except ValidationError as ve: - logger.critical(f"Validation error: {ve.message}") - except SchemaError as se: - logger.critical(f"Schema error: {se.message}") - except json.JSONDecodeError as je: - logger.critical(f"JSON decode error: {je}") - except Exception as e: - logger.critical(f"An unexpected error occurred: {e}") - - return False - - -def parse_json(workflow): - - logger.info(f"Parsing workflow {workflow}") - - workflow = json.load(open(workflow)) - current_file_path = Path(__file__).resolve() - schema_file_path = current_file_path.parent.parent / "schema/capio-cl.json" - - if not validate_json(workflow, schema_file_path): - logger.critical("Invalid JSON") - #exit(-1) - - return workflow diff --git a/capiorun/readme.md b/capiorun/readme.md index a21a85859..b1641e8f4 100644 --- a/capiorun/readme.md +++ b/capiorun/readme.md @@ -1,19 +1,9 @@ -# CAPIORUN: autmated submission of CAPIO workflows -To execute workflows, capiorun requires a json file containing the description -of the workflow steps to execute. +# capiorun: Simplified launch of CAPIO applications + +## Parameters +- capio-dir dir +- capio-app-name app name +- capio-workflow-name workflowname +- capio-log-level (opt) +- executable -```json -{ - "capiocl-file" : "capio_cl config path name", - "capio-dir" : "dir", - "workflow-name" : "name of the workflow", - "steps" : { - "workflow_step_name": { - "exec": "bin path", - "args": "args", - "location": "logic location. used to tell which steps should go in the same submission script" - }, - ... - } -} -``` \ No newline at end of file diff --git a/capiorun/requirements.txt b/capiorun/requirements.txt index bb0d08a0f..66be15d71 100644 --- a/capiorun/requirements.txt +++ b/capiorun/requirements.txt @@ -1,2 +1 @@ -loguru==0.7.3 -jsonschema==4.24.0 \ No newline at end of file +loguru==0.7.3 \ No newline at end of file From 6309a3b2c6220fe824498585a4c314014645fc86 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 17:04:00 +0200 Subject: [PATCH 09/14] Improved code and readme --- capiorun/capiorun | 24 ++++++++++++++---------- capiorun/readme.md | 34 +++++++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/capiorun/capiorun b/capiorun/capiorun index 64b77f9ee..aaacfbf52 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -1,16 +1,20 @@ #!/usr/bin/env python3 -import os, subprocess, argparse, time, sys -import signal +import os, subprocess, argparse, time, sys, signal -from loguru import logger +try: + from loguru import logger -logger.remove() -logger.add( - sink=lambda msg: print(msg, end=''), # or use sys.stdout - format="{time:DD/MM/YYYY HH:mm:ss} | capiorun | {level: <8} | {message}", - colorize=True -) + logger.remove() + logger.add( + sink=lambda msg: print(msg, end=''), # or use sys.stdout + format="{time:DD/MM/YYYY HH:mm:ss} | capiorun | {level: <8} | {message}", + colorize=True + ) +except ImportError: + import logging + + logger = logging.getLogger(__name__) parser = argparse.ArgumentParser(prog='capiorun', description='Simplify the execution of workflows running with CAPIO', epilog='Developed by Marco Edoardo Santimaria') @@ -37,7 +41,7 @@ if __name__ == "__main__": logger.info(f"CAPIO_APP_NAME = {args.app_name}") logger.info(f"CAPIO_DIR = {args.capio_dir}") logger.info(f"CAPIO-CL CONFIG = {args.capiocl}") - if not os.path.exists(args.capiocl): + if not os.path.exists(args.capiocl) and args.capiocl != "--no-config": logger.critical(f"File {args.capiocl} does not exists. aborting execution...") exit(1) server_env = os.environ.copy() diff --git a/capiorun/readme.md b/capiorun/readme.md index b1641e8f4..322cbc680 100644 --- a/capiorun/readme.md +++ b/capiorun/readme.md @@ -1,9 +1,29 @@ -# capiorun: Simplified launch of CAPIO applications +# **`capiorun` โ€“ Simplified Launch of CAPIO Applications** -## Parameters -- capio-dir dir -- capio-app-name app name -- capio-workflow-name workflowname -- capio-log-level (opt) -- executable +`capiorun` is a lightweight Python utility designed to streamline the execution of workflow steps with **CAPIO**. It automates the setup of required environment variables and manages the lifecycle of the `capio_server` instance, reducing the manual configuration needed by users. +If a `capio_server` instance is not already running for a given workflow, `capiorun` will automatically start and manage it. + +--- + +## ๐Ÿ“Œ **Parameters** + +| Flag | Description | +|------|-------------| +| `--capio-dir` *(required)* | Specifies the CAPIO virtual mount point. | +| `--capiocl` | Path to the CAPIO-CL configuration file. | +| `--app-name` *(required)* | The name of the application step being launched. Must match an entry in the CAPIO-CL configuration file (`--capiocl`). | +| `--workflow-name` | The name of the workflow to which the application step (`--app-name`) belongs. | +| `--capio-log-level` *(optional)* | Sets the log level if CAPIO is executed in debug mode. | +| `args` | Remaining parameters that represent the executable and its arguments to be run with CAPIO. | + +--- + +## โš™๏ธ **Optional Overrides** + +You can also explicitly specify the locations of both `libcapio_posix.so` and the `capio_server` binary: + +| Flag | Description | +|------|-------------| +| `--libcapio` | Path to the `libcapio_posix.so` shared library. | +| `--server` | Path to the `capio_server` executable. | From da6f8e0d1df1dd27c8403079b0c8230438f4f3c0 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 17:13:36 +0200 Subject: [PATCH 10/14] added missing vars --- capiorun/capiorun | 36 +++++++++++++++++++++++++++++------- capiorun/readme.md | 13 +++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/capiorun/capiorun b/capiorun/capiorun index aaacfbf52..e4dc5dbee 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -26,8 +26,34 @@ parser.add_argument("-L", "--log-level", default="-1", help="CAPIO log level whe parser.add_argument("-l", "--libcapio", default="libcapio_posix.so", help="Absolute path to libcapio_poix.so") parser.add_argument("-s", "--server", default="capio_server", help="Absolute path to capio_server") parser.add_argument("-c", "--capiocl", default="--no-config", help="Absolute path to the CAPIO-CL configuration file") + +parser.add_argument("--log-dir", default="", help="Set custom log directory") +parser.add_argument("--log-prefix", default="", help="Set custom log filename prefix") +parser.add_argument("--cache-lines", default="", help="Set the number of default CAPIO shm-queues lines") +parser.add_argument("--init-file-size", default="", + help="Set the default size for a new file, when pre-allocating memory") + parser.add_argument('args', nargs=argparse.REMAINDER, help="Command to launch with capio") + +def build_env(args): + env = os.environ.copy() + if args.log_dir: + env["CAPIO_LOG_DIR"] = args.log_dir + if args.log_prefix: + env["CAPIO_LOG_PREFIX"] = args.log_prefix + if args.cache_lines: + env["CAPIO_CACHE_LINES"] = args.cache_lines + if args.init_file_size: + env["CAPIO_FILE_INIT_SIZE"] = args.init_file_size + + env["CAPIO_DIR"] = args.capio_dir + env["CAPIO_LOG_LEVEL"] = args.log_level + step_env["CAPIO_WORKFLOW_NAME"] = args.workflow_name + + return env + + server_process = None step_process = None @@ -44,9 +70,8 @@ if __name__ == "__main__": if not os.path.exists(args.capiocl) and args.capiocl != "--no-config": logger.critical(f"File {args.capiocl} does not exists. aborting execution...") exit(1) - server_env = os.environ.copy() - server_env["CAPIO_DIR"] = args.capio_dir - server_env["CAPIO_LOG_LEVEL"] = args.log_level + server_env = build_env(args) + server_process = subprocess.Popen( [args.server, ("--config " + args.capiocl) if args.capiocl != "--no-config" else args.capiocl], env=server_env, @@ -58,11 +83,8 @@ if __name__ == "__main__": else: logger.debug(f"An instance of capio_server with workflow name {args.workflow_name} already exists!") - step_env = os.environ.copy() - step_env["CAPIO_WORKFLOW_NAME"] = args.workflow_name + step_env = build_env(args) step_env["CAPIO_APP_NAME"] = args.app_name - step_env["CAPIO_DIR"] = args.capio_dir - step_env["CAPIO_LOG_LEVEL"] = args.log_level step_env["LD_PRELOAD"] = args.libcapio logger.info(f"Starting workflow steps with following environment variables:") diff --git a/capiorun/readme.md b/capiorun/readme.md index 322cbc680..4e518a14f 100644 --- a/capiorun/readme.md +++ b/capiorun/readme.md @@ -27,3 +27,16 @@ You can also explicitly specify the locations of both `libcapio_posix.so` and th |------|-------------| | `--libcapio` | Path to the `libcapio_posix.so` shared library. | | `--server` | Path to the `capio_server` executable. | + +--- + +## ๐Ÿงช **Advanced Configuration** + +These optional flags allow fine-tuned control over CAPIO runtime behavior: + +| Flag | Description | +|------|-------------| +| `--log-dir` | Directory where CAPIO should store log files. | +| `--log-prefix` | Prefix to prepend to CAPIO-generated log files. | +| `--cache-lines` | Number of cache lines to be used by CAPIO. Useful for performance tuning. | +| `--init-file-size` | Initial size of CAPIO-managed files upon creation. | From dbfc26769342a0de97cf4f69309bb0351b94f291 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 19 Jun 2025 17:47:30 +0200 Subject: [PATCH 11/14] CMAKELISTS --- CMakeLists.txt | 1 + capiorun/CMakeLists.txt | 6 +++ capiorun/capiorun | 81 +++++++++++++++++++++++++++++++---------- 3 files changed, 69 insertions(+), 19 deletions(-) create mode 100644 capiorun/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index b4d0a7e7c..9386f8512 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,6 +77,7 @@ ENDIF (CAPIO_LOG AND CMAKE_BUILD_TYPE STREQUAL "Debug") ##################################### add_subdirectory(src/posix) add_subdirectory(src/server) +add_subdirectory(capiorun) IF (CAPIO_BUILD_TESTS) message(STATUS "Building CAPIO test suite") diff --git a/capiorun/CMakeLists.txt b/capiorun/CMakeLists.txt new file mode 100644 index 000000000..701db27d5 --- /dev/null +++ b/capiorun/CMakeLists.txt @@ -0,0 +1,6 @@ +set(TARGET_NAME capiorun) + +set(CAPIORUN_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/capiorun) + +install(PROGRAMS ${CAPIORUN_SCRIPT} + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/capiorun/capiorun b/capiorun/capiorun index e4dc5dbee..deb3441af 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -1,6 +1,11 @@ #!/usr/bin/env python3 -import os, subprocess, argparse, time, sys, signal +import argparse +import os +import signal +import subprocess +import sys +import time try: from loguru import logger @@ -8,7 +13,8 @@ try: logger.remove() logger.add( sink=lambda msg: print(msg, end=''), # or use sys.stdout - format="{time:DD/MM/YYYY HH:mm:ss} | capiorun | {level: <8} | {message}", + format="{time:DD/MM/YYYY HH:mm:ss} | capiorun | " + "{level: <8} | {message}", colorize=True ) except ImportError: @@ -16,23 +22,59 @@ except ImportError: logger = logging.getLogger(__name__) -parser = argparse.ArgumentParser(prog='capiorun', description='Simplify the execution of workflows running with CAPIO', - epilog='Developed by Marco Edoardo Santimaria') - -parser.add_argument("-d", "--capio-dir", required=True, help="CAPIO virtual mount point") -parser.add_argument("-w", "--workflow-name", default="CAPIO", help="Name of the workflow") -parser.add_argument("-n", "--app-name", required=True, help="CAPIO Application step name") -parser.add_argument("-L", "--log-level", default="-1", help="CAPIO log level when running in debug mode") -parser.add_argument("-l", "--libcapio", default="libcapio_posix.so", help="Absolute path to libcapio_poix.so") -parser.add_argument("-s", "--server", default="capio_server", help="Absolute path to capio_server") -parser.add_argument("-c", "--capiocl", default="--no-config", help="Absolute path to the CAPIO-CL configuration file") - -parser.add_argument("--log-dir", default="", help="Set custom log directory") -parser.add_argument("--log-prefix", default="", help="Set custom log filename prefix") -parser.add_argument("--cache-lines", default="", help="Set the number of default CAPIO shm-queues lines") +parser = argparse.ArgumentParser( + prog="capiorun", + description=""" +capiorun - Simplified launcher for CAPIO-based applications. + +This utility streamlines the setup and execution of CAPIO workflows by automatically configuring +the environment and managing capio_server instances. It allows running specific application steps +defined in a CAPIO-CL configuration without manual setup of environment variables. + +Typical usage: + capiorun -d /mnt/capio -n myapp -c config.json -- +""", + epilog=""" +Developed by Marco Edoardo Santimaria + +For more information, refer to the CAPIO documentation or repository. +""", + formatter_class=argparse.RawTextHelpFormatter +) + +# Required arguments +parser.add_argument("-d", "--capio-dir", required=True, + help="CAPIO virtual mount point (e.g., /mnt/capio)") +parser.add_argument("-n", "--app-name", required=True, + help="Name of the CAPIO application step to launch. Must match an entry in the CAPIO-CL config.") + +# Optional but commonly used +parser.add_argument("-w", "--workflow-name", default="CAPIO", + help="Workflow name. Should match the name in the CAPIO-CL configuration (default: CAPIO)") +parser.add_argument("-c", "--capiocl", default="--no-config", + help="Path to the CAPIO-CL configuration file (default: --no-config)") + +# Debug and logging +parser.add_argument("-L", "--log-level", default="-1", + help="CAPIO log level. Useful when running in debug mode (default: -1)") +parser.add_argument("--log-dir", default="", + help="Custom directory for CAPIO log output") +parser.add_argument("--log-prefix", default="", + help="Prefix for CAPIO log files") + +# Tuning and advanced +parser.add_argument("--cache-lines", default="", + help="Number of CAPIO shm-queue cache lines (optional tuning parameter)") parser.add_argument("--init-file-size", default="", - help="Set the default size for a new file, when pre-allocating memory") + help="Default file size (in bytes) when pre-allocating memory for new files") + +# Binary locations +parser.add_argument("-l", "--libcapio", default="libcapio_posix.so", + help="Path to libcapio_posix.so shared library (default: libcapio_posix.so)") +parser.add_argument("-s", "--server", default="capio_server", + help="Path to capio_server executable (default: capio_server)") +# Positional arguments parser.add_argument('args', nargs=argparse.REMAINDER, help="Command to launch with capio") @@ -49,7 +91,7 @@ def build_env(args): env["CAPIO_DIR"] = args.capio_dir env["CAPIO_LOG_LEVEL"] = args.log_level - step_env["CAPIO_WORKFLOW_NAME"] = args.workflow_name + env["CAPIO_WORKFLOW_NAME"] = args.workflow_name return env @@ -102,8 +144,9 @@ if __name__ == "__main__": ) step_process.wait() + logger.success(f"Step {args.app_name} terminated successfully") except Exception as e: - logger.critical("An error occurred in startup of workflow app: {}".format(e)) + logger.critical(f"An error occurred in startup/execution of workflow app <{args.app_name}>: {e}") if server_process is not None: logger.info(f"Terminating instance of capio_server") From d72f7f94ad463682e7c64a37c843b91b695f4657 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 20 Jun 2025 10:32:20 +0200 Subject: [PATCH 12/14] Avoid termination of server when used by other apps --- capiorun/capiorun | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/capiorun/capiorun b/capiorun/capiorun index deb3441af..661575029 100755 --- a/capiorun/capiorun +++ b/capiorun/capiorun @@ -96,6 +96,13 @@ def build_env(args): return env +def count_files_starting_with(prefix): + return sum( + 1 for filename in os.listdir("/dev/shm") + if os.path.isfile(os.path.join("/dev/shm", filename)) and filename.startswith(prefix) + ) + + server_process = None step_process = None @@ -149,7 +156,20 @@ if __name__ == "__main__": logger.critical(f"An error occurred in startup/execution of workflow app <{args.app_name}>: {e}") if server_process is not None: - logger.info(f"Terminating instance of capio_server") - server_process.send_signal(signal.SIGTERM) - time.sleep(2) - logger.success("Terminated CAPIO server instance") + if count_files_starting_with(args.workflow_name) > 6: + logger.debug("Server instance is used by other applications... skipping server termination") + else: + logger.info(f"Terminating instance of capio_server") + server_process.send_signal(signal.SIGTERM) + time.sleep(2) + logger.success("Terminated CAPIO server instance") + else: + if count_files_starting_with(args.workflow_name) <= 6: + logger.info("Terminating instance of capio_server started by other capiorun command") + result = subprocess.run(["killall", "capio_server"], stdout=sys.stdout, stderr=sys.stderr) + if result.returncode == 0: + logger.success("Terminated CAPIO server instance") + else: + logger.critical("Error terminating capio_server instance!") + else: + logger.debug("Skipping termination of capio_server") From b561203789a3972a31a85d84555140ea639179e1 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Sun, 22 Jun 2025 10:30:04 +0000 Subject: [PATCH 13/14] Removed cmake from capiorun --- CMakeLists.txt | 1 - capiorun/CMakeLists.txt | 6 ------ 2 files changed, 7 deletions(-) delete mode 100644 capiorun/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index 9386f8512..b4d0a7e7c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,7 +77,6 @@ ENDIF (CAPIO_LOG AND CMAKE_BUILD_TYPE STREQUAL "Debug") ##################################### add_subdirectory(src/posix) add_subdirectory(src/server) -add_subdirectory(capiorun) IF (CAPIO_BUILD_TESTS) message(STATUS "Building CAPIO test suite") diff --git a/capiorun/CMakeLists.txt b/capiorun/CMakeLists.txt deleted file mode 100644 index 701db27d5..000000000 --- a/capiorun/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(TARGET_NAME capiorun) - -set(CAPIORUN_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/capiorun) - -install(PROGRAMS ${CAPIORUN_SCRIPT} - DESTINATION ${CMAKE_INSTALL_BINDIR}) From 649042f7f544617090f019bb22b2c1edac8b697d Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Sun, 22 Jun 2025 11:16:44 +0000 Subject: [PATCH 14/14] Fix on build phase --- CMakeLists.txt | 17 +++++++++++++++++ Dockerfile | 2 ++ capiorun/.gitignore | 3 +-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b4d0a7e7c..ca95ebf7f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,6 +78,23 @@ ENDIF (CAPIO_LOG AND CMAKE_BUILD_TYPE STREQUAL "Debug") add_subdirectory(src/posix) add_subdirectory(src/server) +##################################### +# Install capiorun +##################################### +install( + FILES ${PROJECT_SOURCE_DIR}/capiorun/capiorun + DESTINATION ${CMAKE_INSTALL_BINDIR} + PERMISSIONS + OWNER_READ + OWNER_WRITE + OWNER_EXECUTE + GROUP_READ + GROUP_EXECUTE + WORLD_READ + WORLD_EXECUTE +) + + IF (CAPIO_BUILD_TESTS) message(STATUS "Building CAPIO test suite") add_subdirectory(tests) diff --git a/Dockerfile b/Dockerfile index 1ac97c527..2e71b2dc2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,7 @@ COPY CMakeLists.txt /opt/capio/ COPY scripts /opt/capio/scripts COPY src /opt/capio/src COPY tests /opt/capio/tests +COPY capiorun /opt/capio/capiorun RUN mkdir -p /opt/capio/build \ && cmake \ @@ -97,6 +98,7 @@ COPY --from=builder \ "/usr/local/bin/capio_integration_test_map" \ "/usr/local/bin/capio_integration_test_merge" \ "/usr/local/bin/capio_integration_test_split" \ + "/opt/capio/capiorun/capiorun" \ /usr/local/bin/ # Pkgconfig diff --git a/capiorun/.gitignore b/capiorun/.gitignore index c6a2850e6..f92bc354f 100644 --- a/capiorun/.gitignore +++ b/capiorun/.gitignore @@ -2,5 +2,4 @@ .idea *.json *.sh -__pycache__ -pippo \ No newline at end of file +__pycache__ \ No newline at end of file