From 622a20d9fda108723174938b7a738164cad3eea7 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Mon, 27 Jun 2016 16:49:04 -0400 Subject: [PATCH 01/18] slurm engine loads a template --- .gitignore | 2 ++ misopy/cluster/__init__.py | 27 +++++++++++++++ misopy/cluster/slurm.py | 56 +++++++++++++++++++++++++++++++ misopy/cluster/slurm_template.txt | 15 +++++++++ misopy/miso.py | 1 + misopy/settings/miso_settings.txt | 1 + 6 files changed, 102 insertions(+) create mode 100644 misopy/cluster/__init__.py create mode 100644 misopy/cluster/slurm.py create mode 100644 misopy/cluster/slurm_template.txt diff --git a/.gitignore b/.gitignore index 299f26c..63f11be 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ *.pyc splicing_0.1.tar.gz pysplicing-0.1.tar.gz +.project +.pydevproject diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py new file mode 100644 index 0000000..8bf4edc --- /dev/null +++ b/misopy/cluster/__init__.py @@ -0,0 +1,27 @@ +''' +misopy.cluster + +Cluster factory class for different cluster types + +@author: Aaron Kitzmiller +@copyright: 2016 The Presidents and Fellows of Harvard College. All rights reserved. +@license: GPL v2.0 +@contact: aaron_kitzmiller@harvard.edu +''' + +def getClusterEngine(cluster_type): + ''' + Returns the correct cluster engine + ''' + classname = 'misopy.cluster.%s' % cluster_type + try: + parts = classname.split('.') + module = ".".join(parts[:-1]) + m = __import__( module ) + for comp in parts[1:]: + m = getattr(m, comp) + return m + except ImportError as e: + raise Exception('Unable to import %s: %s' % (classname,str(e))) + return None + \ No newline at end of file diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py new file mode 100644 index 0000000..87d1cc2 --- /dev/null +++ b/misopy/cluster/slurm.py @@ -0,0 +1,56 @@ +''' +misopy.cluster.slurm + +@author: Aaron Kitzmiller +@copyright: 2016 The Presidents and Fellows of Harvard College. All rights reserved. +@license: GPL v2.0 +@contact: aaron_kitzmiller@harvard.edu +''' +import os + +from settings import load_settings + + + +class SlurmClusterEngine(): + ''' + Run jobs on a Slurm cluster + ''' + + def __init__(self,settings_filename): + ''' + Get the slurm job template file and load it + ''' + settings = load_settings(settings_filename) + if not 'slurm_template' in settings: + raise Exception('slurm_template must be defined in settings to use Slurm') + + template_filename = settings['slurm_template'] + if not os.path.exists(template_filename): + raise Exception('Cannot find slurm template file %s' % template_filename) + + with open(template_filename,'r') as f: + self.template = f.read() + + if self.template.strip() == '': + raise Exception('slurm template file %s is empty' % template_filename) + + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None, + settings_fname=None): + pass + + def wait_on_job(self, job_id, cluster_cmd, delay=60): + pass + + def wait_on_jobs(self, job_ids, cluster_cmd, delay=120): + """ + Wait on a set of job IDs. + """ + pass + + def launch_job(self, cluster_cmd, cmd_name): + pass \ No newline at end of file diff --git a/misopy/cluster/slurm_template.txt b/misopy/cluster/slurm_template.txt new file mode 100644 index 0000000..a1a1b0b --- /dev/null +++ b/misopy/cluster/slurm_template.txt @@ -0,0 +1,15 @@ +#!/bin/bash + +#SBATCH -p {partition} +#SBATCH --mem {mem} +#SBATCH --job-name {jobname} +#SBATCH -t {time} +#SBATCH -n 1 +#SBATCH -N 1 + +source new-modules.sh +module load python/2.7.6-fasrc01 +module load miso + +{cmd} + diff --git a/misopy/miso.py b/misopy/miso.py index a062518..8a54cd7 100644 --- a/misopy/miso.py +++ b/misopy/miso.py @@ -245,6 +245,7 @@ def run(self, delay_constant=0.9): chunk=self.chunk_jobs) # End SGE case return + # All cluster jobs cluster_jobs = [] for batch_num, cmd_info in enumerate(all_miso_cmds): diff --git a/misopy/settings/miso_settings.txt b/misopy/settings/miso_settings.txt index d9adfa0..54ffc1e 100644 --- a/misopy/settings/miso_settings.txt +++ b/misopy/settings/miso_settings.txt @@ -4,6 +4,7 @@ min_event_reads = 20 [cluster] cluster_command = qsub +slurm_template = /home/akitzmiller/workspace/MISO/misopy/cluster/slurm_template.txt [sampler] burn_in = 500 From 6b077bee26d736f9656a4595f3e129373b0e6522 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 14:06:45 -0400 Subject: [PATCH 02/18] checking for job --- misopy/cluster/slurm.py | 96 +++++++++++++++++++++++++++++++++++------ 1 file changed, 83 insertions(+), 13 deletions(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 87d1cc2..3232b7d 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -6,7 +6,7 @@ @license: GPL v2.0 @contact: aaron_kitzmiller@harvard.edu ''' -import os +import os, subprocess, traceback from settings import load_settings @@ -21,11 +21,15 @@ def __init__(self,settings_filename): ''' Get the slurm job template file and load it ''' - settings = load_settings(settings_filename) - if not 'slurm_template' in settings: + self.settings = load_settings(settings_filename) + if not 'slurm_template' in self.settings: raise Exception('slurm_template must be defined in settings to use Slurm') - template_filename = settings['slurm_template'] + self.squeue_max_attempts = 10 + if 'squeue_max_attempts' in self.settings: + self.squeue_max_attempts = int(self.settings['squeue_max_attempts']) + + template_filename = self.settings['slurm_template'] if not os.path.exists(template_filename): raise Exception('Cannot find slurm template file %s' % template_filename) @@ -44,13 +48,79 @@ def run_on_cluster(self, cmd, job_name, cluster_output_dir, pass def wait_on_job(self, job_id, cluster_cmd, delay=60): - pass - - def wait_on_jobs(self, job_ids, cluster_cmd, delay=120): - """ - Wait on a set of job IDs. - """ - pass + ''' + Wait until job is done. Uses squeue first, then sacct. + Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. + Max attempts is needed to ensure that squeue information is available. + ''' + squeue_cmd = 'squeue --noheader --format %%T -j %d' % job_id + sacct_cmd = 'sacct --noheader --format State -j %d.batch' % job_id + + done = False + squeue_attempts = 0 + state = None + + while not done: + + # If we've tried squeue_max_attempts and gotten no information, then quit + squeue_attempts += 1 + if squeue_attempts == self.squeue_max_attempts and state is None: + raise Exception('Attempted to query squeue /sacct %d times and retrieved no result' % squeue_attempts) + + proc = subprocess.Popen(squeue_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + output,err = proc.communicate() + + if proc.returncode != 0: + # The whole command failed. Weird. + raise Exception('squeue command %s failed: %s' % (squeue_cmd,err)) + + if output.strip() != '': + state = output.strip() + else: + # Try sacct. The job may be done and so disappeared from squeue + proc = subprocess.Popen(sacct_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + output,err = proc.communicate() + + if proc.returncode != 0: + # The whole command failed. Weird. + raise Exception('sacct command %s failed: %s' % (sacct_cmd,err)) + + if output.strip() != '': + state = output.strip() + + if state is not None: + if state in ["COMPLETED","COMPLETING","CANCELLED","FAILED","TIMEOUT","PREEMPTED","NODE_FAIL"]: + done = True + + + + + - def launch_job(self, cluster_cmd, cmd_name): - pass \ No newline at end of file + def launch_job(self, cluster_cmd): + ''' + Runs cluster command and returns the job id + ''' + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output,err = proc.communicate() + if proc.returncode != 0 or err.strip() != '': + raise Exception('Error launching job with %s: %s' % (cluster_cmd,err)) + + job_id = output.strip().replace('Submitted batch job ','') + try: + job_id = int(job_id) + except Exception as e: + raise Exception('Returned job id %s is not a number ?!?!' % job_id) + + return job_id \ No newline at end of file From d5dd3a26df3e72453dea2c742cf3af47937cebf5 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 14:38:29 -0400 Subject: [PATCH 03/18] added jobscript creation --- misopy/cluster/slurm.py | 50 ++++++++++++++++++++++++++----- misopy/cluster/slurm_template.txt | 7 ++--- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 3232b7d..43100ad 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -6,9 +6,10 @@ @license: GPL v2.0 @contact: aaron_kitzmiller@harvard.edu ''' -import os, subprocess, traceback +import os, subprocess, traceback, time -from settings import load_settings +from misopy.settings import load_settings +from misopy import misc_utils @@ -39,15 +40,50 @@ def __init__(self,settings_filename): if self.template.strip() == '': raise Exception('slurm template file %s is empty' % template_filename) - + + + def make_bash_script(self,script_name,cmd): + ''' + Use the template to write out a sbatch submission script + ''' + scripttxt = self.template.format(cmd=cmd) + with open(script_name,'w') as script: + script.write(scripttxt + '\n') + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, cluster_scripts_dir=None, queue_type=None, settings_fname=None): - pass + ''' + Composes job script and launches job + ''' + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'sbatch -o \"%s\" -e \"%s\"' %(scripts_output_dir,scripts_output_dir) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H:%M:%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + - def wait_on_job(self, job_id, cluster_cmd, delay=60): + + def wait_on_job(self, job_id, delay=60): ''' Wait until job is done. Uses squeue first, then sacct. Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. @@ -97,9 +133,7 @@ def wait_on_job(self, job_id, cluster_cmd, delay=60): if state is not None: if state in ["COMPLETED","COMPLETING","CANCELLED","FAILED","TIMEOUT","PREEMPTED","NODE_FAIL"]: done = True - - - + diff --git a/misopy/cluster/slurm_template.txt b/misopy/cluster/slurm_template.txt index a1a1b0b..ce2af65 100644 --- a/misopy/cluster/slurm_template.txt +++ b/misopy/cluster/slurm_template.txt @@ -1,9 +1,8 @@ #!/bin/bash -#SBATCH -p {partition} -#SBATCH --mem {mem} -#SBATCH --job-name {jobname} -#SBATCH -t {time} +#SBATCH -p serial_requeue +#SBATCH --mem 4000 +#SBATCH -t 0-1:00 #SBATCH -n 1 #SBATCH -N 1 From fa509a3eb440b01fc0d4f9086fe0532cb1f6765b Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 14:51:22 -0400 Subject: [PATCH 04/18] colons blown --- misopy/cluster/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 43100ad..92e9d13 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -75,7 +75,7 @@ def run_on_cluster(self, cmd, job_name, cluster_output_dir, script_name = os.path.join(cluster_scripts_dir, '%s_time_%s.sh' \ %(job_name, - time.strftime("%m-%d-%y_%H:%M:%S"))) + time.strftime("%m-%d-%y_%H_%M_%S"))) self.make_bash_script(script_name, cmd) cluster_cmd = cluster_call + ' \"%s\"' %(script_name) job_id = self.launch_job(cluster_cmd) From c715314a49bdc3a2aa8c589cc0329397fa77e33e Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 14:52:49 -0400 Subject: [PATCH 05/18] print --- misopy/cluster/slurm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 92e9d13..3218fee 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -141,6 +141,7 @@ def launch_job(self, cluster_cmd): ''' Runs cluster command and returns the job id ''' + print 'Running command %s' % cmd proc = subprocess.Popen(cluster_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, From 767c85ddc017a50f707a72020f35be18a6b93bdc Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 14:56:04 -0400 Subject: [PATCH 06/18] print --- misopy/cluster/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 3218fee..1e7dfc7 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -141,7 +141,7 @@ def launch_job(self, cluster_cmd): ''' Runs cluster command and returns the job id ''' - print 'Running command %s' % cmd + print 'Running command %s' % cluster_cmd proc = subprocess.Popen(cluster_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, From 39fc069097edf7bcae5c57e8db93b027a3565062 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 14:59:49 -0400 Subject: [PATCH 07/18] job name in out/err file --- misopy/cluster/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 1e7dfc7..34fa4e9 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -70,7 +70,7 @@ def run_on_cluster(self, cmd, job_name, cluster_output_dir, 'scripts_output') misc_utils.make_dir(scripts_output_dir) scripts_output_dir = os.path.abspath(scripts_output_dir) - cluster_call = 'sbatch -o \"%s\" -e \"%s\"' %(scripts_output_dir,scripts_output_dir) + cluster_call = 'sbatch -o \"%s.%s.out\" -e \"%s.%s.err\"' %(scripts_output_dir,job_name,scripts_output_dir,job_name) script_name = os.path.join(cluster_scripts_dir, '%s_time_%s.sh' \ From ed78689b789074e43961df47a79018eb01d22585 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 15:01:45 -0400 Subject: [PATCH 08/18] use workdir instead of explicit -o -e --- misopy/cluster/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index 34fa4e9..a801db4 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -70,7 +70,7 @@ def run_on_cluster(self, cmd, job_name, cluster_output_dir, 'scripts_output') misc_utils.make_dir(scripts_output_dir) scripts_output_dir = os.path.abspath(scripts_output_dir) - cluster_call = 'sbatch -o \"%s.%s.out\" -e \"%s.%s.err\"' %(scripts_output_dir,job_name,scripts_output_dir,job_name) + cluster_call = 'sbatch -D \"%s\"' %(scripts_output_dir) script_name = os.path.join(cluster_scripts_dir, '%s_time_%s.sh' \ From 277f2fe7f69a774961e83b26f4264631fb5deb65 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 15:06:31 -0400 Subject: [PATCH 09/18] no module load miso --- misopy/cluster/slurm.py | 4 +++- misopy/cluster/slurm_template.txt | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py index a801db4..6151d26 100644 --- a/misopy/cluster/slurm.py +++ b/misopy/cluster/slurm.py @@ -133,7 +133,9 @@ def wait_on_job(self, job_id, delay=60): if state is not None: if state in ["COMPLETED","COMPLETING","CANCELLED","FAILED","TIMEOUT","PREEMPTED","NODE_FAIL"]: done = True - + print state + + time.sleep(delay) diff --git a/misopy/cluster/slurm_template.txt b/misopy/cluster/slurm_template.txt index ce2af65..e3b4669 100644 --- a/misopy/cluster/slurm_template.txt +++ b/misopy/cluster/slurm_template.txt @@ -8,7 +8,6 @@ source new-modules.sh module load python/2.7.6-fasrc01 -module load miso {cmd} From 407516947bff8143faa48174007b2abbbe866020 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Tue, 28 Jun 2016 15:43:49 -0400 Subject: [PATCH 10/18] this should work --- misopy/cluster/__init__.py | 422 +++++++++++++++++++++++++++++++++++-- misopy/cluster/slurm.py | 163 -------------- misopy/miso.py | 8 +- 3 files changed, 415 insertions(+), 178 deletions(-) delete mode 100644 misopy/cluster/slurm.py diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py index 8bf4edc..0a49bfd 100644 --- a/misopy/cluster/__init__.py +++ b/misopy/cluster/__init__.py @@ -1,7 +1,7 @@ ''' misopy.cluster -Cluster factory class for different cluster types +Cluster factory for different cluster types @author: Aaron Kitzmiller @copyright: 2016 The Presidents and Fellows of Harvard College. All rights reserved. @@ -9,19 +9,415 @@ @contact: aaron_kitzmiller@harvard.edu ''' -def getClusterEngine(cluster_type): +import os, subprocess, traceback, time + +from misopy.settings import load_settings, Settings +from misopy import misc_utils + + +def getClusterEngine(cluster_type,settings_fname): ''' Returns the correct cluster engine ''' - classname = 'misopy.cluster.%s' % cluster_type - try: - parts = classname.split('.') - module = ".".join(parts[:-1]) - m = __import__( module ) - for comp in parts[1:]: - m = getattr(m, comp) - return m - except ImportError as e: - raise Exception('Unable to import %s: %s' % (classname,str(e))) - return None + ce = None + if cluster_type == 'slurm': + ce = SlurmClusterEngine(settings_fname) + + return ce + + +class LsfClusterEngine(): + ''' + Run jobs on an LSF cluster + ''' + + def __init__(self,settings_filename): + ''' + Get the slurm job template file and load it + ''' + self.settings = load_settings(settings_filename) + + + def make_bash_script(self,filename, cmd, crate_dir=None): + """ + Make an executable bash script out of the given command. + """ + # os.system('ls %s' %(filename)) + if crate_dir == None: + crate_dir = \ + os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) + f = open(filename, 'w') + f.write("#!/bin/bash\n") + f.write("export PATH=$PATH:%s\n" %(crate_dir)) + f.write("source ~/.bash_profile\n") + f.write("cd %s\n" %(crate_dir)) + #write_cluster_preface(f) + f.write(cmd + "\n") + f.close() + os.system('chmod +x \"%s\"' %(filename)) + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None): + ''' + Composes job script and launches job + ''' + print "Submitting job: %s" %(job_name) + queue_name = None + + # Load command name from settings file + cmd_name = self.settings.get_cluster_command() + + if queue_type == "long": + queue_name = self.settings.get_long_queue_name() + elif queue_type == "short": + queue_name = self.settings.get_short_queue_name() + else: + print "Warning: Unknown queue type: %s" %(queue_type) + queue_name = queue_type + + if queue_type is None: + print " - queue type: unspecified" + else: + print " - queue type: %s" %(queue_type) + if queue_name is None: + print " - queue name unspecified" + else: + print " - queue name: %s" %(queue_name) + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'bsub -o \"%s\" -e \"%s\"' %(scripts_output_dir, + scripts_output_dir) + # Add queue type if given one + if queue_name != None: + cluster_call += ' -q \"%s\"' %(queue_name) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H_%M_%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + + + + def wait_on_job(self, job_id, delay=60): + # Handle bsub + while True: + output = subprocess.Popen("bjobs %i" %(job_id), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE).communicate() + if len(output[0]) > 0: + status = output[0].split()[10] + if status == "DONE": + break + else: + # No jobs available + break + time.sleep(delay) + time.sleep(delay) + + + + def launch_job(self, cluster_cmd): + """ + Execute cluster_cmd and return its job ID if + it can be fetched. + """ + print "Executing: %s" %(cluster_cmd) + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output = proc.communicate() + job_id = None + if "is submitted to" in output[0]: + job_id = int(output[0].strip().split()[1][1:-1]) + return job_id + + + +class SgeClusterEngine(): + ''' + Run jobs on an SGE cluster + ''' + + def __init__(self,settings_filename): + ''' + Get the slurm job template file and load it + ''' + self.settings = load_settings(settings_filename) + + + def make_bash_script(self,filename, cmd, crate_dir=None): + """ + Make an executable bash script out of the given command. + """ + # os.system('ls %s' %(filename)) + if crate_dir == None: + crate_dir = \ + os.path.dirname(os.path.abspath(os.path.expanduser(__file__))) + f = open(filename, 'w') + f.write("#!/bin/bash\n") + f.write("export PATH=$PATH:%s\n" %(crate_dir)) + f.write("source ~/.bash_profile\n") + f.write("cd %s\n" %(crate_dir)) + #write_cluster_preface(f) + f.write(cmd + "\n") + f.close() + os.system('chmod +x \"%s\"' %(filename)) + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None): + ''' + Composes job script and launches job + ''' + print "Submitting job: %s" %(job_name) + queue_name = None + + # Load command name from settings file + cmd_name = self.settings.get_cluster_command() + + if queue_type == "long": + queue_name = self.settings.get_long_queue_name() + elif queue_type == "short": + queue_name = self.settings.get_short_queue_name() + else: + print "Warning: Unknown queue type: %s" %(queue_type) + queue_name = queue_type + + if queue_type is None: + print " - queue type: unspecified" + else: + print " - queue type: %s" %(queue_type) + if queue_name is None: + print " - queue name unspecified" + else: + print " - queue name: %s" %(queue_name) + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'qsub -o \"%s\" -e \"%s\"' %(scripts_output_dir, + scripts_output_dir) + # Add queue type if given one + if queue_name != None: + cluster_call += ' -q \"%s\"' %(queue_name) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H_%M_%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + + + + def wait_on_job(self, job_id, delay=60): + # Handle qsub + while True: + output = \ + subprocess.Popen("qstat %i" %(job_id), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE).communicate() + if "Unknown Job" in output[1]: + break + time.sleep(delay) + time.sleep(delay) + + + + def launch_job(self, cluster_cmd): + """ + Execute cluster_cmd and return its job ID if + it can be fetched. + """ + print "Executing: %s" %(cluster_cmd) + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output = proc.communicate() + job_id = None + if "." in output[0][:-1] and ">" not in output[0]: + job_id = int(output[0].split(".")[0]) + return job_id + + + +class SlurmClusterEngine(): + ''' + Run jobs on a Slurm cluster + ''' + + def __init__(self,settings_filename): + ''' + Get the slurm job template file and load it + ''' + self.settings = load_settings(settings_filename) + if not 'slurm_template' in self.settings: + raise Exception('slurm_template must be defined in settings to use Slurm') + + self.squeue_max_attempts = 10 + if 'squeue_max_attempts' in self.settings: + self.squeue_max_attempts = int(self.settings['squeue_max_attempts']) + + template_filename = self.settings['slurm_template'] + if not os.path.exists(template_filename): + raise Exception('Cannot find slurm template file %s' % template_filename) + + with open(template_filename,'r') as f: + self.template = f.read() + + if self.template.strip() == '': + raise Exception('slurm template file %s is empty' % template_filename) + + + + def make_bash_script(self,script_name,cmd): + ''' + Use the template to write out a sbatch submission script + ''' + scripttxt = self.template.format(cmd=cmd) + with open(script_name,'w') as script: + script.write(scripttxt + '\n') + + + + def run_on_cluster(self, cmd, job_name, cluster_output_dir, + cluster_scripts_dir=None, + queue_type=None, + settings_fname=None): + ''' + Composes job script and launches job + ''' + + misc_utils.make_dir(cluster_output_dir) + if cluster_scripts_dir == None: + cluster_scripts_dir = os.path.join(cluster_output_dir, + 'cluster_scripts') + misc_utils.make_dir(cluster_scripts_dir) + + scripts_output_dir = os.path.join(cluster_output_dir, + 'scripts_output') + misc_utils.make_dir(scripts_output_dir) + scripts_output_dir = os.path.abspath(scripts_output_dir) + cluster_call = 'sbatch -D \"%s\"' %(scripts_output_dir) + + script_name = os.path.join(cluster_scripts_dir, + '%s_time_%s.sh' \ + %(job_name, + time.strftime("%m-%d-%y_%H_%M_%S"))) + self.make_bash_script(script_name, cmd) + cluster_cmd = cluster_call + ' \"%s\"' %(script_name) + job_id = self.launch_job(cluster_cmd) + return job_id + + + + def wait_on_job(self, job_id, delay=60): + ''' + Wait until job is done. Uses squeue first, then sacct. + Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. + Max attempts is needed to ensure that squeue information is available. + ''' + squeue_cmd = 'squeue --noheader --format %%T -j %d' % job_id + sacct_cmd = 'sacct --noheader --format State -j %d.batch' % job_id + + done = False + squeue_attempts = 0 + state = None + + while not done: + + # If we've tried squeue_max_attempts and gotten no information, then quit + squeue_attempts += 1 + if squeue_attempts == self.squeue_max_attempts and state is None: + raise Exception('Attempted to query squeue /sacct %d times and retrieved no result' % squeue_attempts) + + proc = subprocess.Popen(squeue_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + output,err = proc.communicate() + + if proc.returncode != 0: + # The whole command failed. Weird. + raise Exception('squeue command %s failed: %s' % (squeue_cmd,err)) + + if output.strip() != '': + state = output.strip() + else: + # Try sacct. The job may be done and so disappeared from squeue + proc = subprocess.Popen(sacct_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + output,err = proc.communicate() + + if proc.returncode != 0: + # The whole command failed. Weird. + raise Exception('sacct command %s failed: %s' % (sacct_cmd,err)) + + if output.strip() != '': + state = output.strip() + + if state is not None: + if state in ["COMPLETED","COMPLETING","CANCELLED","FAILED","TIMEOUT","PREEMPTED","NODE_FAIL"]: + done = True + print state + + time.sleep(delay) + + + + def launch_job(self, cluster_cmd): + ''' + Runs cluster command and returns the job id + ''' + print 'Running command %s' % cluster_cmd + proc = subprocess.Popen(cluster_cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + # Read the job ID if it's a known cluster + # submission system + output,err = proc.communicate() + if proc.returncode != 0 or err.strip() != '': + raise Exception('Error launching job with %s: %s' % (cluster_cmd,err)) + + job_id = output.strip().replace('Submitted batch job ','') + try: + job_id = int(job_id) + except Exception as e: + raise Exception('Returned job id %s is not a number ?!?!' % job_id) + + return job_id \ No newline at end of file diff --git a/misopy/cluster/slurm.py b/misopy/cluster/slurm.py deleted file mode 100644 index 6151d26..0000000 --- a/misopy/cluster/slurm.py +++ /dev/null @@ -1,163 +0,0 @@ -''' -misopy.cluster.slurm - -@author: Aaron Kitzmiller -@copyright: 2016 The Presidents and Fellows of Harvard College. All rights reserved. -@license: GPL v2.0 -@contact: aaron_kitzmiller@harvard.edu -''' -import os, subprocess, traceback, time - -from misopy.settings import load_settings -from misopy import misc_utils - - - -class SlurmClusterEngine(): - ''' - Run jobs on a Slurm cluster - ''' - - def __init__(self,settings_filename): - ''' - Get the slurm job template file and load it - ''' - self.settings = load_settings(settings_filename) - if not 'slurm_template' in self.settings: - raise Exception('slurm_template must be defined in settings to use Slurm') - - self.squeue_max_attempts = 10 - if 'squeue_max_attempts' in self.settings: - self.squeue_max_attempts = int(self.settings['squeue_max_attempts']) - - template_filename = self.settings['slurm_template'] - if not os.path.exists(template_filename): - raise Exception('Cannot find slurm template file %s' % template_filename) - - with open(template_filename,'r') as f: - self.template = f.read() - - if self.template.strip() == '': - raise Exception('slurm template file %s is empty' % template_filename) - - - - def make_bash_script(self,script_name,cmd): - ''' - Use the template to write out a sbatch submission script - ''' - scripttxt = self.template.format(cmd=cmd) - with open(script_name,'w') as script: - script.write(scripttxt + '\n') - - - - def run_on_cluster(self, cmd, job_name, cluster_output_dir, - cluster_scripts_dir=None, - queue_type=None, - settings_fname=None): - ''' - Composes job script and launches job - ''' - - misc_utils.make_dir(cluster_output_dir) - if cluster_scripts_dir == None: - cluster_scripts_dir = os.path.join(cluster_output_dir, - 'cluster_scripts') - misc_utils.make_dir(cluster_scripts_dir) - - scripts_output_dir = os.path.join(cluster_output_dir, - 'scripts_output') - misc_utils.make_dir(scripts_output_dir) - scripts_output_dir = os.path.abspath(scripts_output_dir) - cluster_call = 'sbatch -D \"%s\"' %(scripts_output_dir) - - script_name = os.path.join(cluster_scripts_dir, - '%s_time_%s.sh' \ - %(job_name, - time.strftime("%m-%d-%y_%H_%M_%S"))) - self.make_bash_script(script_name, cmd) - cluster_cmd = cluster_call + ' \"%s\"' %(script_name) - job_id = self.launch_job(cluster_cmd) - return job_id - - - - def wait_on_job(self, job_id, delay=60): - ''' - Wait until job is done. Uses squeue first, then sacct. - Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. - Max attempts is needed to ensure that squeue information is available. - ''' - squeue_cmd = 'squeue --noheader --format %%T -j %d' % job_id - sacct_cmd = 'sacct --noheader --format State -j %d.batch' % job_id - - done = False - squeue_attempts = 0 - state = None - - while not done: - - # If we've tried squeue_max_attempts and gotten no information, then quit - squeue_attempts += 1 - if squeue_attempts == self.squeue_max_attempts and state is None: - raise Exception('Attempted to query squeue /sacct %d times and retrieved no result' % squeue_attempts) - - proc = subprocess.Popen(squeue_cmd, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE) - output,err = proc.communicate() - - if proc.returncode != 0: - # The whole command failed. Weird. - raise Exception('squeue command %s failed: %s' % (squeue_cmd,err)) - - if output.strip() != '': - state = output.strip() - else: - # Try sacct. The job may be done and so disappeared from squeue - proc = subprocess.Popen(sacct_cmd, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE) - output,err = proc.communicate() - - if proc.returncode != 0: - # The whole command failed. Weird. - raise Exception('sacct command %s failed: %s' % (sacct_cmd,err)) - - if output.strip() != '': - state = output.strip() - - if state is not None: - if state in ["COMPLETED","COMPLETING","CANCELLED","FAILED","TIMEOUT","PREEMPTED","NODE_FAIL"]: - done = True - print state - - time.sleep(delay) - - - - def launch_job(self, cluster_cmd): - ''' - Runs cluster command and returns the job id - ''' - print 'Running command %s' % cluster_cmd - proc = subprocess.Popen(cluster_cmd, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE) - # Read the job ID if it's a known cluster - # submission system - output,err = proc.communicate() - if proc.returncode != 0 or err.strip() != '': - raise Exception('Error launching job with %s: %s' % (cluster_cmd,err)) - - job_id = output.strip().replace('Submitted batch job ','') - try: - job_id = int(job_id) - except Exception as e: - raise Exception('Returned job id %s is not a number ?!?!' % job_id) - - return job_id \ No newline at end of file diff --git a/misopy/miso.py b/misopy/miso.py index 8a54cd7..be4c93d 100644 --- a/misopy/miso.py +++ b/misopy/miso.py @@ -22,6 +22,7 @@ from misopy.settings import Settings, load_settings from misopy.settings import miso_path as miso_settings_path import misopy.cluster_utils as cluster_utils +from misopy.cluster import getClusterEngine miso_path = os.path.dirname(os.path.abspath(__file__)) manual_url = "http://genes.mit.edu/burgelab/miso/docs/" @@ -266,6 +267,9 @@ def run(self, delay_constant=0.9): print " - Submitted thread %s" %(thread_id) self.threads[thread_id] = p else: + # Setup cluster engine + self.cluster_engine = getClusterEngine('slurm',self.settings_fname) + # Run on cluster if batch_size >= self.long_thresh: queue_type = "long" @@ -275,7 +279,7 @@ def run(self, delay_constant=0.9): job_name = "gene_psi_batch_%d" %(batch_num) print "Submitting to cluster: %s" %(cmd_to_run) job_id = \ - cluster_utils.run_on_cluster(cmd_to_run, + self.cluster_engine.run_on_cluster(cmd_to_run, job_name, self.output_dir, queue_type=queue_type, @@ -298,7 +302,7 @@ def run(self, delay_constant=0.9): "system.") # Try to wait on jobs no matter what; though if 'cluster_jobs' # is empty here, it will not wait - cluster_utils.wait_on_jobs(cluster_jobs, + self.cluster_engine.wait_on_jobs(cluster_jobs, self.cluster_cmd) else: if self.use_cluster: From f7b0f5787e5b52b489607b2ab491bb83c907f908 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Wed, 29 Jun 2016 11:14:27 -0400 Subject: [PATCH 11/18] logger issues --- misopy/run_events_analysis.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/misopy/run_events_analysis.py b/misopy/run_events_analysis.py index 09e0447..d0d7b56 100644 --- a/misopy/run_events_analysis.py +++ b/misopy/run_events_analysis.py @@ -19,10 +19,12 @@ from misopy.settings import Settings, load_settings from misopy.settings import miso_path as miso_settings_path import misopy.cluster_utils as cluster_utils +from misopy.miso_sampler import get_logger miso_path = os.path.dirname(os.path.abspath(__file__)) manual_url = "http://genes.mit.edu/burgelab/miso/docs/" +miso_logger = get_logger('miso_logger') def get_ids_passing_filter(gff_index_dir, bam_filename, @@ -168,11 +170,11 @@ def check_gff_and_bam(gff_dir, bam_filename, main_logger, if bam_starts_with_chr != gff_starts_with_chr: mismatch_found = True if mismatch_found: - miso_logger.warning("It looks like your GFF annotation file and your BAM " \ + main_logger.warning("It looks like your GFF annotation file and your BAM " \ "file might not have matching headers (chromosome names.) " \ "If this is the case, your run will fail as no reads from " \ "the BAM could be matched up with your annotation.") - miso_logger.warning("Please see:\n\t%s\n for more information." %(manual_url)) + main_logger.warning("Please see:\n\t%s\n for more information." %(manual_url)) # Default: assume BAM starts with chr headers chr_containing = "BAM file (%s)" %(bam_filename) not_chr_containing = "GFF annotation (%s)" %(gff_dir) @@ -180,16 +182,16 @@ def check_gff_and_bam(gff_dir, bam_filename, main_logger, # BAM does not start with chr, GFF does chr_containing, not_chr_containing = \ not_chr_containing, chr_containing - miso_logger.warning("It looks like your %s contains \'chr\' chromosomes (UCSC-style) " \ + main_logger.warning("It looks like your %s contains \'chr\' chromosomes (UCSC-style) " \ "while your %s does not." %(chr_containing, not_chr_containing)) - miso_logger.warning("The first few BAM chromosomes were: %s" \ + main_logger.warning("The first few BAM chromosomes were: %s" \ %(",".join(bam_chroms.keys()))) print "BAM references: " print bam_file.references - miso_logger.warning("The first few GFF chromosomes were: %s" \ + main_logger.warning("The first few GFF chromosomes were: %s" \ %(",".join(gff_chroms.keys()))) - miso_logger.warning("Run is likely to fail or produce empty output. Proceeding " \ + main_logger.warning("Run is likely to fail or produce empty output. Proceeding " \ "anyway...") time.sleep(15) From ddd50fc08956d5e31f54bb6bef429dca364b161d Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Wed, 29 Jun 2016 11:20:16 -0400 Subject: [PATCH 12/18] logger issues --- misopy/run_events_analysis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/misopy/run_events_analysis.py b/misopy/run_events_analysis.py index d0d7b56..1ec78ec 100644 --- a/misopy/run_events_analysis.py +++ b/misopy/run_events_analysis.py @@ -24,7 +24,6 @@ miso_path = os.path.dirname(os.path.abspath(__file__)) manual_url = "http://genes.mit.edu/burgelab/miso/docs/" -miso_logger = get_logger('miso_logger') def get_ids_passing_filter(gff_index_dir, bam_filename, From 694ff56a83c64bb23791f9cb7ef607e2cf345b51 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Wed, 29 Jun 2016 11:53:31 -0400 Subject: [PATCH 13/18] wait on jobs added to abstract cluster engine --- misopy/cluster/__init__.py | 59 +++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py index 0a49bfd..8bcf184 100644 --- a/misopy/cluster/__init__.py +++ b/misopy/cluster/__init__.py @@ -25,19 +25,52 @@ def getClusterEngine(cluster_type,settings_fname): return ce - -class LsfClusterEngine(): +class AbstractClusterEngine(object): ''' - Run jobs on an LSF cluster + Base class for cluster engines ''' - def __init__(self,settings_filename): ''' - Get the slurm job template file and load it + Load settings ''' self.settings = load_settings(settings_filename) + def wait_on_jobs(self,job_ids, cluster_cmd, + delay=120): + """ + Wait on a set of job IDs. + """ + if len(job_ids) == 0: + return + num_jobs = len(job_ids) + print "Waiting on a set of %d jobs..." %(num_jobs) + curr_time = time.strftime("%x, %X") + t_start = time.time() + print " - Starting to wait at %s" %(curr_time) + completed_jobs = {} + for job_id in job_ids: + if job_id in completed_jobs: + continue + self.wait_on_job(job_id, cluster_cmd) + print " - Job ", job_id, " completed." + completed_jobs[job_id] = True + curr_time = time.strftime("%x, %X") + t_end = time.time() + print "Jobs completed at %s" %(curr_time) + duration = ((t_end - t_start) / 60.) / 60. + print " - Took %.2f hours." %(duration) + + def wait_on_job(self, job_id, delay): + + raise Exception('Must implement wait on job') + + +class LsfClusterEngine(AbstractClusterEngine): + ''' + Run jobs on an LSF cluster + ''' + def make_bash_script(self,filename, cmd, crate_dir=None): """ Make an executable bash script out of the given command. @@ -151,18 +184,11 @@ def launch_job(self, cluster_cmd): -class SgeClusterEngine(): +class SgeClusterEngine(AbstractClusterEngine): ''' Run jobs on an SGE cluster ''' - - def __init__(self,settings_filename): - ''' - Get the slurm job template file and load it - ''' - self.settings = load_settings(settings_filename) - - + def make_bash_script(self,filename, cmd, crate_dir=None): """ Make an executable bash script out of the given command. @@ -272,7 +298,7 @@ def launch_job(self, cluster_cmd): -class SlurmClusterEngine(): +class SlurmClusterEngine(AbstractClusterEngine): ''' Run jobs on a Slurm cluster ''' @@ -281,7 +307,8 @@ def __init__(self,settings_filename): ''' Get the slurm job template file and load it ''' - self.settings = load_settings(settings_filename) + super(SlurmClusterEngine,self).__init__(settings_filename) + if not 'slurm_template' in self.settings: raise Exception('slurm_template must be defined in settings to use Slurm') From f1387f703e4fa6a9e3d6aee884e464b7ebcd60ea Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Wed, 29 Jun 2016 13:04:52 -0400 Subject: [PATCH 14/18] float for sleep --- misopy/cluster/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py index 8bcf184..dc85778 100644 --- a/misopy/cluster/__init__.py +++ b/misopy/cluster/__init__.py @@ -36,7 +36,7 @@ def __init__(self,settings_filename): self.settings = load_settings(settings_filename) def wait_on_jobs(self,job_ids, cluster_cmd, - delay=120): + delay=120.0): """ Wait on a set of job IDs. """ @@ -369,7 +369,7 @@ def run_on_cluster(self, cmd, job_name, cluster_output_dir, - def wait_on_job(self, job_id, delay=60): + def wait_on_job(self, job_id, delay=60.0): ''' Wait until job is done. Uses squeue first, then sacct. Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. From abace9c61b6acd401a8aee920113e401f3060e29 Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Wed, 29 Jun 2016 15:32:37 -0400 Subject: [PATCH 15/18] shorter delay for job check --- misopy/cluster/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py index dc85778..3fdc458 100644 --- a/misopy/cluster/__init__.py +++ b/misopy/cluster/__init__.py @@ -369,7 +369,7 @@ def run_on_cluster(self, cmd, job_name, cluster_output_dir, - def wait_on_job(self, job_id, delay=60.0): + def wait_on_job(self, job_id, delay=10): ''' Wait until job is done. Uses squeue first, then sacct. Runs squeue /sacct until either the job is done or until squeue_max_attempts is reached. @@ -421,7 +421,7 @@ def wait_on_job(self, job_id, delay=60.0): done = True print state - time.sleep(delay) + time.sleep(10) From 26907a0c9b42bdeacbbc144a540c34c34521ca5e Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Wed, 29 Jun 2016 16:36:41 -0400 Subject: [PATCH 16/18] using cluster_type as a command switch --- misopy/cluster/__init__.py | 6 ++++++ misopy/miso.py | 21 ++++++++++++++++----- misopy/settings/miso_settings.txt | 2 +- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py index 3fdc458..cc51927 100644 --- a/misopy/cluster/__init__.py +++ b/misopy/cluster/__init__.py @@ -22,6 +22,12 @@ def getClusterEngine(cluster_type,settings_fname): ce = None if cluster_type == 'slurm': ce = SlurmClusterEngine(settings_fname) + elif cluster_type == 'lsf': + ce = LsfClusterEngine(settings_fname) + elif cluster_type == 'sge': + ce = SgeClusterEngine(settings_fname) + else: + raise Exception('Unknown cluster type %s' % cluster_type) return ce diff --git a/misopy/miso.py b/misopy/miso.py index be4c93d..3443855 100644 --- a/misopy/miso.py +++ b/misopy/miso.py @@ -84,7 +84,8 @@ def __init__(self, gff_dir, bam_filename, sge_job_name="misojob", gene_ids=None, num_proc=None, - wait_on_jobs=True): + wait_on_jobs=True, + cluster_type=None): self.main_logger = main_logger self.threads = {} self.gff_dir = gff_dir @@ -111,6 +112,7 @@ def __init__(self, gff_dir, bam_filename, self.cluster_cmd = Settings.get_cluster_command() self.sge_job_name = sge_job_name self.wait_on_jobs = wait_on_jobs + self.cluster_type = cluster_type # if chunk_jobs not given (i.e. set to False), # then set it to arbitrary value if not self.chunk_jobs: @@ -268,7 +270,10 @@ def run(self, delay_constant=0.9): self.threads[thread_id] = p else: # Setup cluster engine - self.cluster_engine = getClusterEngine('slurm',self.settings_fname) + if self.cluster_type is None: + raise Exception('If --use-cluster is selected and --SGEarray is not, you must define a --cluster-type') + + self.cluster_engine = getClusterEngine(self.cluster_type,self.settings_fname) # Run on cluster if batch_size >= self.long_thresh: @@ -353,7 +358,8 @@ def compute_all_genes_psi(gff_dir, bam_filename, read_len, job_name="misojob", num_proc=None, prefilter=False, - wait_on_jobs=True): + wait_on_jobs=True, + cluster_type=None): """ Compute Psi values for genes using a GFF and a BAM filename. @@ -426,7 +432,8 @@ def compute_all_genes_psi(gff_dir, bam_filename, read_len, SGEarray=SGEarray, gene_ids=all_gene_ids, num_proc=num_proc, - wait_on_jobs=wait_on_jobs) + wait_on_jobs=wait_on_jobs, + cluster_type=cluster_type) dispatcher.run() @@ -448,6 +455,8 @@ def main(): parser.add_option("--use-cluster", dest="use_cluster", action="store_true", default=False, help="Run events on cluster.") + parser.add_option("--cluster-type", dest="cluster_type", + help="Type of cluster. One of slurm, sge, or lsf.", default=None) parser.add_option("--chunk-jobs", dest="chunk_jobs", default=False, type="int", help="Size (in number of events) of each job to chunk " @@ -579,6 +588,7 @@ def main(): if options.overhang_len != None: overhang_len = options.overhang_len + # Whether to wait on cluster jobs or not wait_on_jobs = not options.no_wait compute_all_genes_psi(gff_filename, bam_filename, @@ -593,7 +603,8 @@ def main(): settings_fname=settings_filename, prefilter=options.prefilter, num_proc=options.num_proc, - wait_on_jobs=wait_on_jobs) + wait_on_jobs=wait_on_jobs, + cluster_type=options.cluster_type) if options.view_gene != None: indexed_gene_filename = \ diff --git a/misopy/settings/miso_settings.txt b/misopy/settings/miso_settings.txt index 54ffc1e..1ca01a4 100644 --- a/misopy/settings/miso_settings.txt +++ b/misopy/settings/miso_settings.txt @@ -3,7 +3,7 @@ filter_results = True min_event_reads = 20 [cluster] -cluster_command = qsub +cluster_command = sbatch slurm_template = /home/akitzmiller/workspace/MISO/misopy/cluster/slurm_template.txt [sampler] From aa528c7ed24878c3fb5a9dfacb373313cbfe977d Mon Sep 17 00:00:00 2001 From: Aaron Kitzmiller Date: Thu, 30 Jun 2016 10:25:38 -0400 Subject: [PATCH 17/18] using cluster command to select engine --- misopy/cluster/__init__.py | 13 ++++++------- misopy/miso.py | 21 +++++++-------------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/misopy/cluster/__init__.py b/misopy/cluster/__init__.py index cc51927..dce0153 100644 --- a/misopy/cluster/__init__.py +++ b/misopy/cluster/__init__.py @@ -15,19 +15,19 @@ from misopy import misc_utils -def getClusterEngine(cluster_type,settings_fname): +def getClusterEngine(clustercmd,settings_fname): ''' Returns the correct cluster engine ''' ce = None - if cluster_type == 'slurm': + if clustercmd == 'sbatch': ce = SlurmClusterEngine(settings_fname) - elif cluster_type == 'lsf': + elif clustercmd == 'bsub': ce = LsfClusterEngine(settings_fname) - elif cluster_type == 'sge': + elif clustercmd == 'qsub': ce = SgeClusterEngine(settings_fname) else: - raise Exception('Unknown cluster type %s' % cluster_type) + raise Exception('Unknown cluster command %s' % clustercmd) return ce @@ -346,8 +346,7 @@ def make_bash_script(self,script_name,cmd): def run_on_cluster(self, cmd, job_name, cluster_output_dir, cluster_scripts_dir=None, - queue_type=None, - settings_fname=None): + queue_type=None): ''' Composes job script and launches job ''' diff --git a/misopy/miso.py b/misopy/miso.py index 3443855..82cc0e1 100644 --- a/misopy/miso.py +++ b/misopy/miso.py @@ -84,8 +84,7 @@ def __init__(self, gff_dir, bam_filename, sge_job_name="misojob", gene_ids=None, num_proc=None, - wait_on_jobs=True, - cluster_type=None): + wait_on_jobs=True): self.main_logger = main_logger self.threads = {} self.gff_dir = gff_dir @@ -112,7 +111,6 @@ def __init__(self, gff_dir, bam_filename, self.cluster_cmd = Settings.get_cluster_command() self.sge_job_name = sge_job_name self.wait_on_jobs = wait_on_jobs - self.cluster_type = cluster_type # if chunk_jobs not given (i.e. set to False), # then set it to arbitrary value if not self.chunk_jobs: @@ -270,10 +268,10 @@ def run(self, delay_constant=0.9): self.threads[thread_id] = p else: # Setup cluster engine - if self.cluster_type is None: - raise Exception('If --use-cluster is selected and --SGEarray is not, you must define a --cluster-type') + Settings.load(self.settings_fname) + clustercmd = Settings.get_cluster_command() - self.cluster_engine = getClusterEngine(self.cluster_type,self.settings_fname) + self.cluster_engine = getClusterEngine(clustercmd,self.settings_fname) # Run on cluster if batch_size >= self.long_thresh: @@ -287,8 +285,7 @@ def run(self, delay_constant=0.9): self.cluster_engine.run_on_cluster(cmd_to_run, job_name, self.output_dir, - queue_type=queue_type, - settings_fname=self.settings_fname) + queue_type=queue_type) if job_id is not None: cluster_jobs.append(job_id) time.sleep(delay_constant) @@ -432,8 +429,7 @@ def compute_all_genes_psi(gff_dir, bam_filename, read_len, SGEarray=SGEarray, gene_ids=all_gene_ids, num_proc=num_proc, - wait_on_jobs=wait_on_jobs, - cluster_type=cluster_type) + wait_on_jobs=wait_on_jobs) dispatcher.run() @@ -455,8 +451,6 @@ def main(): parser.add_option("--use-cluster", dest="use_cluster", action="store_true", default=False, help="Run events on cluster.") - parser.add_option("--cluster-type", dest="cluster_type", - help="Type of cluster. One of slurm, sge, or lsf.", default=None) parser.add_option("--chunk-jobs", dest="chunk_jobs", default=False, type="int", help="Size (in number of events) of each job to chunk " @@ -603,8 +597,7 @@ def main(): settings_fname=settings_filename, prefilter=options.prefilter, num_proc=options.num_proc, - wait_on_jobs=wait_on_jobs, - cluster_type=options.cluster_type) + wait_on_jobs=wait_on_jobs) if options.view_gene != None: indexed_gene_filename = \ From 4b75ebd602b576390930484a4226b7f8e2be2adf Mon Sep 17 00:00:00 2001 From: = <=> Date: Mon, 3 Oct 2016 10:46:19 -0400 Subject: [PATCH 18/18] add cluster package --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 8dbb48e..4ce2fde 100644 --- a/setup.py +++ b/setup.py @@ -101,6 +101,7 @@ # Tell distutils to look for pysplicing in the right directory package_dir = {'pysplicing': 'pysplicing/pysplicing'}, packages = ['misopy', + 'misopy.cluster', 'misopy.sashimi_plot', 'misopy.sashimi_plot.plot_utils', 'pysplicing'],