diff --git a/Pipeliner.py b/Pipeliner.py index 4ef391a..1bc82a3 100755 --- a/Pipeliner.py +++ b/Pipeliner.py @@ -13,12 +13,18 @@ from inspect import getframeinfo, stack import threading + logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +stdout_handler = logging.StreamHandler(sys.stdout) +stdout_handler.setLevel(logging.INFO) +logger.addHandler(stdout_handler) def run_cmd(cmd, ignore_error=False): - logger.info("Running: " + cmd) + logger.debug("Running: " + cmd) try: subprocess.check_call(cmd, shell=True) except subprocess.CalledProcessError as e: @@ -47,21 +53,29 @@ def __init__(self, checkpoint_dir): self._checkpoint_dir = checkpoint_dir + fhandler = logging.FileHandler(os.path.join(self._checkpoint_dir, "pipeliner.log")) + fhandler.setLevel(logging.DEBUG) + logger.addHandler(fhandler) + + def add_commands(self, cmds_list): for cmd in cmds_list: # check it's a proper Command object - if not (isinstance(cmd, Command) or isinstance(cmd, ParallelCommandList) ): - errmsg = "Pipeliner::add_commmands - Error, cmd {} is not a Command or ParallelCommandList object".format(cmd) - logger.critical(errmsg) - raise(RuntimeError(errmsg)) + if (isinstance(cmd, Message)): + self._cmds_list.append(cmd) + else: + if not (isinstance(cmd, Command) or isinstance(cmd, ParallelCommandList)): + errmsg = "Pipeliner::add_commmands - Error, cmd {} is not a Command or ParallelCommandList object".format(cmd) + logger.critical(errmsg) + raise(RuntimeError(errmsg)) - if cmd.get_checkpoint() in self._unique_checkpoints: - raise ValueError('Duplicate checkpoint {}'.format(cmd.get_checkpoint())) - self._unique_checkpoints.add(cmd.get_checkpoint()) - self._cmds_list.append(cmd) + if cmd.get_checkpoint() in self._unique_checkpoints: + raise ValueError('Duplicate checkpoint {}'.format(cmd.get_checkpoint())) + self._unique_checkpoints.add(cmd.get_checkpoint()) + self._cmds_list.append(cmd) def num_cmds(self): @@ -120,7 +134,7 @@ def run(self, checkpoint_dir): checkpoint_file = os.path.sep.join([checkpoint_dir, self.get_checkpoint()]) ret = 0 if os.path.exists(checkpoint_file): - logger.info("CMD: " + self.get_cmd() + " already processed. Skipping.") + logger.debug("CMD: " + self.get_cmd() + " already processed. Skipping.") else: # execute it. If it succeeds, make the checkpoint file start_time = time.time() @@ -137,13 +151,21 @@ def run(self, checkpoint_dir): else: end_time = time.time() runtime_minutes = (end_time - start_time) / 60 - logger.info("Execution Time = {:.2f} minutes. CMD: {}".format(runtime_minutes, cmdstr)) + logger.debug("Execution Time = {:.2f} minutes. CMD: {}".format(runtime_minutes, cmdstr)) with open(checkpoint_file, "w") as f: f.write(cmdstr + "\n") return ret +class Message(object): + def __init__(self, message): + self._message = message + + def run(self, checkpoint_dir): + logger.info(self._message) + + ############################# ## Parallel command execution #############################