From fff9a8fea1d43085e4239fe34aebb44b6499082c Mon Sep 17 00:00:00 2001 From: GeorgescuC Date: Fri, 25 Oct 2024 13:15:22 -0400 Subject: [PATCH 1/2] Pipeliner: Add Message() class as an alternative to Command() that simply outputs the message provided to the logger. Used so the message is output at the relevant step when running the Pipeliner, and can serve to flank a group of commands to indicate start/end. --- Pipeliner.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/Pipeliner.py b/Pipeliner.py index 4ef391a..13e8983 100755 --- a/Pipeliner.py +++ b/Pipeliner.py @@ -53,15 +53,18 @@ def add_commands(self, cmds_list): for cmd in cmds_list: # check it's a proper Command object - if not (isinstance(cmd, Command) or isinstance(cmd, ParallelCommandList) ): - errmsg = "Pipeliner::add_commmands - Error, cmd {} is not a Command or ParallelCommandList object".format(cmd) - logger.critical(errmsg) - raise(RuntimeError(errmsg)) + if (isinstance(cmd, Message)): + self._cmds_list.append(cmd) + else: + if not (isinstance(cmd, Command) or isinstance(cmd, ParallelCommandList)): + errmsg = "Pipeliner::add_commmands - Error, cmd {} is not a Command or ParallelCommandList object".format(cmd) + logger.critical(errmsg) + raise(RuntimeError(errmsg)) - if cmd.get_checkpoint() in self._unique_checkpoints: - raise ValueError('Duplicate checkpoint {}'.format(cmd.get_checkpoint())) - self._unique_checkpoints.add(cmd.get_checkpoint()) - self._cmds_list.append(cmd) + if cmd.get_checkpoint() in self._unique_checkpoints: + raise ValueError('Duplicate checkpoint {}'.format(cmd.get_checkpoint())) + self._unique_checkpoints.add(cmd.get_checkpoint()) + self._cmds_list.append(cmd) def num_cmds(self): @@ -144,6 +147,14 @@ def run(self, checkpoint_dir): return ret +class Message(object): + def __init__(self, message): + self._message = message + + def run(self, checkpoint_dir): + logger.info(self._message) + + ############################# ## Parallel command execution ############################# From 20ef637b4b9fac8bcfb13b3049053fb7cd792bf5 Mon Sep 17 00:00:00 2001 From: GeorgescuC Date: Fri, 25 Oct 2024 13:24:01 -0400 Subject: [PATCH 2/2] Add specific logger handlers so that INFO is output to stdout and DEBUG to a pipeliner.log file in the checkpoints directory. Also move some of the commands logging to DEBUG from INFO. --- Pipeliner.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/Pipeliner.py b/Pipeliner.py index 13e8983..1bc82a3 100755 --- a/Pipeliner.py +++ b/Pipeliner.py @@ -13,12 +13,18 @@ from inspect import getframeinfo, stack import threading + logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +stdout_handler = logging.StreamHandler(sys.stdout) +stdout_handler.setLevel(logging.INFO) +logger.addHandler(stdout_handler) def run_cmd(cmd, ignore_error=False): - logger.info("Running: " + cmd) + logger.debug("Running: " + cmd) try: subprocess.check_call(cmd, shell=True) except subprocess.CalledProcessError as e: @@ -47,6 +53,11 @@ def __init__(self, checkpoint_dir): self._checkpoint_dir = checkpoint_dir + fhandler = logging.FileHandler(os.path.join(self._checkpoint_dir, "pipeliner.log")) + fhandler.setLevel(logging.DEBUG) + logger.addHandler(fhandler) + + def add_commands(self, cmds_list): @@ -123,7 +134,7 @@ def run(self, checkpoint_dir): checkpoint_file = os.path.sep.join([checkpoint_dir, self.get_checkpoint()]) ret = 0 if os.path.exists(checkpoint_file): - logger.info("CMD: " + self.get_cmd() + " already processed. Skipping.") + logger.debug("CMD: " + self.get_cmd() + " already processed. Skipping.") else: # execute it. If it succeeds, make the checkpoint file start_time = time.time() @@ -140,7 +151,7 @@ def run(self, checkpoint_dir): else: end_time = time.time() runtime_minutes = (end_time - start_time) / 60 - logger.info("Execution Time = {:.2f} minutes. CMD: {}".format(runtime_minutes, cmdstr)) + logger.debug("Execution Time = {:.2f} minutes. CMD: {}".format(runtime_minutes, cmdstr)) with open(checkpoint_file, "w") as f: f.write(cmdstr + "\n")