Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions Pipeliner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
from inspect import getframeinfo, stack
import threading


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
logger.addHandler(stdout_handler)


def run_cmd(cmd, ignore_error=False):

logger.info("Running: " + cmd)
logger.debug("Running: " + cmd)
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as e:
Expand Down Expand Up @@ -47,21 +53,29 @@ def __init__(self, checkpoint_dir):

self._checkpoint_dir = checkpoint_dir

fhandler = logging.FileHandler(os.path.join(self._checkpoint_dir, "pipeliner.log"))
fhandler.setLevel(logging.DEBUG)
logger.addHandler(fhandler)




def add_commands(self, cmds_list):

for cmd in cmds_list:
# check it's a proper Command object
if not (isinstance(cmd, Command) or isinstance(cmd, ParallelCommandList) ):
errmsg = "Pipeliner::add_commmands - Error, cmd {} is not a Command or ParallelCommandList object".format(cmd)
logger.critical(errmsg)
raise(RuntimeError(errmsg))
if (isinstance(cmd, Message)):
self._cmds_list.append(cmd)
else:
if not (isinstance(cmd, Command) or isinstance(cmd, ParallelCommandList)):
errmsg = "Pipeliner::add_commmands - Error, cmd {} is not a Command or ParallelCommandList object".format(cmd)
logger.critical(errmsg)
raise(RuntimeError(errmsg))

if cmd.get_checkpoint() in self._unique_checkpoints:
raise ValueError('Duplicate checkpoint {}'.format(cmd.get_checkpoint()))
self._unique_checkpoints.add(cmd.get_checkpoint())
self._cmds_list.append(cmd)
if cmd.get_checkpoint() in self._unique_checkpoints:
raise ValueError('Duplicate checkpoint {}'.format(cmd.get_checkpoint()))
self._unique_checkpoints.add(cmd.get_checkpoint())
self._cmds_list.append(cmd)


def num_cmds(self):
Expand Down Expand Up @@ -120,7 +134,7 @@ def run(self, checkpoint_dir):
checkpoint_file = os.path.sep.join([checkpoint_dir, self.get_checkpoint()])
ret = 0
if os.path.exists(checkpoint_file):
logger.info("CMD: " + self.get_cmd() + " already processed. Skipping.")
logger.debug("CMD: " + self.get_cmd() + " already processed. Skipping.")
else:
# execute it. If it succeeds, make the checkpoint file
start_time = time.time()
Expand All @@ -137,13 +151,21 @@ def run(self, checkpoint_dir):
else:
end_time = time.time()
runtime_minutes = (end_time - start_time) / 60
logger.info("Execution Time = {:.2f} minutes. CMD: {}".format(runtime_minutes, cmdstr))
logger.debug("Execution Time = {:.2f} minutes. CMD: {}".format(runtime_minutes, cmdstr))
with open(checkpoint_file, "w") as f:
f.write(cmdstr + "\n")

return ret


class Message(object):
def __init__(self, message):
self._message = message

def run(self, checkpoint_dir):
logger.info(self._message)


#############################
## Parallel command execution
#############################
Expand Down