Skip to content

Commit 7e8d84e

Browse files
committed
[ENH] Addressed @mgermain comments. Added unit tests and refactored the script smart_dispatch.py.
1 parent 64649c0 commit 7e8d84e

File tree

4 files changed

+104
-52
lines changed

4 files changed

+104
-52
lines changed

scripts/smart_dispatch.py

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import logging
2121
import smartdispatch
2222

23-
2423
LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
2524
CLUSTER_NAME = utils.detect_cluster()
2625
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME)
@@ -32,6 +31,7 @@ def main():
3231
logging.root.setLevel(logging.INFO)
3332

3433
args = parse_arguments()
34+
path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME)
3535

3636
# Check if RESUME or LAUNCH mode
3737
if args.mode == "launch":
@@ -48,13 +48,20 @@ def main():
4848
commands = smartdispatch.replace_uid_tag(commands)
4949
nb_commands = len(commands) # For print at the end
5050

51-
path_job, path_job_logs, path_job_commands = create_job_folders(jobname)
5251
elif args.mode == "resume":
5352
jobname = args.batch_uid
54-
path_job, path_job_logs, path_job_commands = get_job_folders(args.batch_uid)
53+
if os.path.isdir(jobname):
54+
# We assume `jobname` is `path_job` repo, we extract the real `jobname`.
55+
jobname = os.path.basename(os.path.abspath(jobname))
56+
57+
if not os.path.isdir(pjoin(path_smartdispatch_logs, jobname)):
58+
raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname))
5559
else:
5660
raise ValueError("Unknown subcommand!")
5761

62+
job_folders_paths = smartdispatch.get_job_folders(path_smartdispatch_logs, jobname)
63+
path_job, path_job_logs, path_job_commands = job_folders_paths
64+
5865
# Keep a log of the command line in the job folder.
5966
command_line = " ".join(sys.argv)
6067
smartdispatch.log_command_line(path_job, command_line)
@@ -165,49 +172,5 @@ def parse_arguments():
165172
return args
166173

167174

168-
def _gen_job_paths(jobname):
169-
path_smartdispatch_logs = pjoin(os.getcwd(), LOGS_FOLDERNAME)
170-
path_job = pjoin(path_smartdispatch_logs, jobname)
171-
path_job_logs = pjoin(path_job, 'logs')
172-
path_job_commands = pjoin(path_job, 'commands')
173-
174-
return path_job, path_job_logs, path_job_commands
175-
176-
177-
def get_job_folders(jobname):
178-
if os.path.isdir(jobname):
179-
# We assume `jobname` is `path_job` repo, we extract the real `jobname`.
180-
jobname = os.path.basename(os.path.abspath(jobname))
181-
182-
path_job, path_job_logs, path_job_commands = _gen_job_paths(jobname)
183-
184-
if not os.path.exists(path_job_commands):
185-
raise LookupError("Batch UID ({0}) does not exist! Cannot resume.".format(jobname))
186-
187-
if not os.path.exists(path_job_logs):
188-
os.makedirs(path_job_logs)
189-
if not os.path.exists(pjoin(path_job_logs, "worker")):
190-
os.makedirs(pjoin(path_job_logs, "worker"))
191-
if not os.path.exists(pjoin(path_job_logs, "job")):
192-
os.makedirs(pjoin(path_job_logs, "job"))
193-
194-
return path_job, path_job_logs, path_job_commands
195-
196-
197-
def create_job_folders(jobname):
198-
"""Creates the folders where the logs, commands and QSUB files will be saved."""
199-
path_job, path_job_logs, path_job_commands = _gen_job_paths(jobname)
200-
201-
if not os.path.exists(path_job_commands):
202-
os.makedirs(path_job_commands)
203-
204-
if not os.path.exists(path_job_logs):
205-
os.makedirs(path_job_logs)
206-
os.makedirs(pjoin(path_job_logs, "worker"))
207-
os.makedirs(pjoin(path_job_logs, "job"))
208-
209-
return path_job, path_job_logs, path_job_commands
210-
211-
212175
if __name__ == "__main__":
213176
main()

smartdispatch/smartdispatch.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ def get_available_queues(cluster_name=utils.detect_cluster()):
110110
return {}
111111

112112
smartdispatch_dir, _ = os.path.split(smartdispatch.__file__)
113-
config_dir = os.path.join(smartdispatch_dir, 'config')
113+
config_dir = pjoin(smartdispatch_dir, 'config')
114114

115115
config_filename = cluster_name + ".json"
116-
config_filepath = os.path.join(config_dir, config_filename)
116+
config_filepath = pjoin(config_dir, config_filename)
117117

118118
if not os.path.isfile(config_filepath):
119119
return {} # Unknown cluster
@@ -122,6 +122,24 @@ def get_available_queues(cluster_name=utils.detect_cluster()):
122122
return queues_infos
123123

124124

125+
def get_job_folders(path, jobname, create_if_needed=False):
126+
""" Get all folder paths for a specific job (creating them if needed). """
127+
path_job = pjoin(path, jobname)
128+
path_job_logs = pjoin(path_job, 'logs')
129+
path_job_commands = pjoin(path_job, 'commands')
130+
131+
if not os.path.isdir(path_job_commands):
132+
os.makedirs(path_job_commands)
133+
if not os.path.isdir(path_job_logs):
134+
os.makedirs(path_job_logs)
135+
if not os.path.isdir(pjoin(path_job_logs, "worker")):
136+
os.makedirs(pjoin(path_job_logs, "worker"))
137+
if not os.path.isdir(pjoin(path_job_logs, "job")):
138+
os.makedirs(pjoin(path_job_logs, "job"))
139+
140+
return path_job, path_job_logs, path_job_commands
141+
142+
125143
def log_command_line(path_job, command_line):
126144
""" Logs a command line in a job folder.
127145

smartdispatch/tests/test_smartdispatch.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,52 @@ def test_get_available_queues():
146146
assert_true(len(queues_infos) > 0)
147147

148148

149+
def test_get_job_folders():
150+
temp_dir = tempfile.mkdtemp()
151+
jobname = "this_is_the_name_of_my_job"
152+
job_folders_paths = smartdispatch.get_job_folders(temp_dir, jobname)
153+
path_job, path_job_logs, path_job_commands = job_folders_paths
154+
155+
assert_true(jobname in path_job)
156+
assert_true(os.path.isdir(path_job))
157+
assert_equal(os.path.basename(path_job), jobname)
158+
159+
assert_true(jobname in path_job_logs)
160+
assert_true(os.path.isdir(path_job_logs))
161+
assert_true(os.path.isdir(pjoin(path_job_logs, 'worker')))
162+
assert_true(os.path.isdir(pjoin(path_job_logs, 'job')))
163+
assert_true(os.path.isdir(path_job_logs))
164+
assert_equal(os.path.basename(path_job_logs), "logs")
165+
166+
assert_true(jobname in path_job_commands)
167+
assert_true(os.path.isdir(path_job_commands))
168+
assert_equal(os.path.basename(path_job_commands), "commands")
169+
170+
# In theory the following should not create new folders.
171+
# Insteead it will return the paths to existing folders.
172+
jobname += "2"
173+
os.rename(path_job, path_job + "2")
174+
job_folders_paths = smartdispatch.get_job_folders(temp_dir, jobname)
175+
path_job, path_job_logs, path_job_commands = job_folders_paths
176+
177+
assert_true(jobname in path_job)
178+
assert_true(os.path.isdir(path_job))
179+
assert_equal(os.path.basename(path_job), jobname)
180+
181+
assert_true(jobname in path_job_logs)
182+
assert_true(os.path.isdir(path_job_logs))
183+
assert_true(os.path.isdir(pjoin(path_job_logs, 'worker')))
184+
assert_true(os.path.isdir(pjoin(path_job_logs, 'job')))
185+
assert_true(os.path.isdir(path_job_logs))
186+
assert_equal(os.path.basename(path_job_logs), "logs")
187+
188+
assert_true(jobname in path_job_commands)
189+
assert_true(os.path.isdir(path_job_commands))
190+
assert_equal(os.path.basename(path_job_commands), "commands")
191+
192+
shutil.rmtree(temp_dir)
193+
194+
149195
def test_log_command_line():
150196
temp_dir = tempfile.mkdtemp()
151197
command_line_log_file = pjoin(temp_dir, "command_line.log")
@@ -177,4 +223,4 @@ def test_log_command_line():
177223
assert_true(t.strftime("## %Y-%m-%d %H:%M:") in lines[6]) # Don't check second.
178224
assert_equal(lines[7], re.sub(r'(\[)([^\[\]]*\\ [^\[\]]*)(\])', r'"\1\2\3"', command_3))
179225

180-
shutil.rmtree(temp_dir)
226+
shutil.rmtree(temp_dir)

tests/test_smart_dispatch.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ def setUp(self):
2323
os.chdir(self.testing_dir)
2424

2525
def tearDown(self):
26-
print "Tear down"
2726
os.chdir(self._cwd)
2827
shutil.rmtree(self.testing_dir)
2928

@@ -37,7 +36,7 @@ def test_main_launch(self):
3736
assert_equal(len(os.listdir(self.logs_dir)), 1)
3837

3938
def test_main_resume(self):
40-
# SetUp
39+
# Setup
4140
call(self.launch_command, shell=True)
4241
batch_uid = os.listdir(self.logs_dir)[0]
4342

@@ -61,6 +60,32 @@ def test_main_resume(self):
6160
assert_equal(len(open(running_commands_file).readlines()), 0)
6261
assert_equal(len(open(pending_commands_file).readlines()), len(commands))
6362

63+
# Test when batch_uid is a path instead of a jobname.
64+
# Setup
65+
call(self.launch_command, shell=True)
66+
batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
67+
68+
# Simulate that some commands are in the running state.
69+
path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
70+
pending_commands_file = pjoin(path_job_commands, "commands.txt")
71+
running_commands_file = pjoin(path_job_commands, "running_commands.txt")
72+
commands = open(pending_commands_file).read().strip().split("\n")
73+
with open(running_commands_file, 'w') as running_commands:
74+
running_commands.write("\n".join(commands[::2]) + "\n")
75+
with open(pending_commands_file, 'w') as pending_commands:
76+
pending_commands.write("\n".join(commands[1::2]) + "\n")
77+
78+
# Actual test (should move running commands back to pending).
79+
exit_status = call(self.resume_command.format(batch_uid), shell=True)
80+
81+
# Test validation
82+
assert_equal(exit_status, 0)
83+
assert_true(os.path.isdir(self.logs_dir))
84+
assert_equal(len(os.listdir(self.logs_dir)), 1)
85+
assert_equal(len(open(running_commands_file).readlines()), 0)
86+
assert_equal(len(open(pending_commands_file).readlines()), len(commands))
87+
88+
6489
def test_main_resume_only_pending(self):
6590
# SetUp
6691
call(self.launch_command, shell=True)

0 commit comments

Comments
 (0)