Skip to content

Commit ee85deb

Browse files
committed
Merge pull request #85 from mgermain/worker
Enforce worker use and added a default to the worker pool size
2 parents 3f82d39 + 15eb60f commit ee85deb

File tree

5 files changed

+55
-134
lines changed

5 files changed

+55
-134
lines changed

scripts/smart_dispatch.py

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,47 +53,43 @@ def main():
5353
else:
5454
raise ValueError("Unknown subcommand!")
5555

56-
# Pool of workers
57-
if args.pool is not None:
58-
command_manager = CommandManager(os.path.join(path_job_commands, "commands.txt"))
56+
command_manager = CommandManager(os.path.join(path_job_commands, "commands.txt"))
5957

60-
# If resume mode, reset running jobs
61-
if args.mode == "launch":
62-
command_manager.set_commands_to_run(commands)
63-
else:
64-
# Verifying if there is are failed commands
65-
failed_commands = command_manager.get_failed_commands()
66-
if len(failed_commands) > 0:
67-
FAILED_COMMAND_MESSAGE = dedent("""\
68-
{nb_failed} command(s) are in a failed state. They won't be resumed.
69-
Failed commands:
70-
{failed_commands}
71-
The actual errors can be found in the log folder under:
72-
{failed_commands_err_file}""")
73-
utils.print_boxed(FAILED_COMMAND_MESSAGE.format(
74-
nb_failed=len(failed_commands),
75-
failed_commands=''.join(failed_commands),
76-
failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1])+'.err' for c in failed_commands])
77-
))
78-
79-
if not utils.yes_no_prompt("Do you want to continue?", 'n'):
80-
exit()
81-
82-
command_manager.reset_running_commands()
83-
nb_commands = command_manager.get_nb_commands_to_run()
84-
85-
worker_command = 'smart_worker.py "{0}" "{1}"'.format(command_manager._commands_filename, path_job_logs)
86-
# Replace commands with `args.pool` workers
87-
commands = [worker_command] * args.pool
88-
89-
# Add redirect for output and error logs
90-
for i, command in enumerate(commands):
91-
# Change directory before executing command
92-
commands[i] = 'cd "{cwd}"; '.format(cwd=os.getcwd()) + commands[i]
93-
# Log command's output and command's error
94-
log_filename = os.path.join(path_job_logs, smartdispatch.generate_name_from_command(command, max_length_arg=30))
95-
commands[i] += ' 1>> "{output_log}"'.format(output_log=log_filename + ".o")
96-
commands[i] += ' 2>> "{error_log}"'.format(error_log=log_filename + ".e")
58+
# If resume mode, reset running jobs
59+
if args.mode == "launch":
60+
command_manager.set_commands_to_run(commands)
61+
elif args.mode == "resume":
62+
# Verifying if there are failed commands
63+
failed_commands = command_manager.get_failed_commands()
64+
if len(failed_commands) > 0:
65+
FAILED_COMMAND_MESSAGE = dedent("""\
66+
{nb_failed} command(s) are in a failed state. They won't be resumed.
67+
Failed commands:
68+
{failed_commands}
69+
The actual errors can be found in the log folder under:
70+
{failed_commands_err_file}""")
71+
utils.print_boxed(FAILED_COMMAND_MESSAGE.format(
72+
nb_failed=len(failed_commands),
73+
failed_commands=''.join(failed_commands),
74+
failed_commands_err_file='\n'.join([utils.generate_uid_from_string(c[:-1]) + '.err' for c in failed_commands])
75+
))
76+
77+
if not utils.yes_no_prompt("Do you want to continue?", 'n'):
78+
exit()
79+
80+
command_manager.reset_running_commands()
81+
nb_commands = command_manager.get_nb_commands_to_run()
82+
83+
# If no pool size is specified the number of commands is taken
84+
if args.pool is None:
85+
args.pool = command_manager.get_nb_commands_to_run()
86+
87+
# Generating all the worker commands
88+
COMMAND_STRING = 'cd "{cwd}"; smart_worker.py "{commands_file}" "{log_folder}" '\
89+
'1>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.o" '\
90+
'2>> "{log_folder}/worker/$PBS_JOBID\"\"_worker_{{ID}}.e" '
91+
COMMAND_STRING = COMMAND_STRING.format(cwd=os.getcwd(), commands_file=command_manager._commands_filename, log_folder=path_job_logs)
92+
commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)]
9793

9894
# TODO: use args.memPerNode instead of args.memPerNode
9995
queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, np.inf, args.modules)
@@ -129,17 +125,17 @@ def parse_arguments():
129125
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
130126
parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
131127
parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
132-
#parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
128+
# parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
133129

134130
parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1)
135131
parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1)
136-
#parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
132+
# parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
137133
parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)')
138134

139135
parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+')
140136
parser.add_argument('-x', '--doNotLaunch', action='store_true', help='Creates the QSUB files without launching them.')
141137

142-
parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands.")
138+
parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands")
143139
subparsers = parser.add_subparsers(dest="mode")
144140

145141
launch_parser = subparsers.add_parser('launch', help="Launch jobs.")
@@ -180,6 +176,7 @@ def get_job_folders(jobname):
180176

181177
if not os.path.exists(path_job_logs):
182178
os.makedirs(path_job_logs)
179+
os.makedirs(os.path.join(path_job_logs, "worker"))
183180

184181
return path_job, path_job_logs, path_job_commands
185182

@@ -193,6 +190,7 @@ def create_job_folders(jobname):
193190

194191
if not os.path.exists(path_job_logs):
195192
os.makedirs(path_job_logs)
193+
os.makedirs(os.path.join(path_job_logs, "worker"))
196194

197195
return path_job, path_job_logs, path_job_commands
198196

smartdispatch/smartdispatch.py

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import os
44
import re
55
import itertools
6-
from datetime import datetime
6+
import time as t
77

88
import smartdispatch
99
from smartdispatch import utils
@@ -35,53 +35,9 @@ def generate_name_from_command(command, max_length_arg=None, max_length=None):
3535
if max_length_arg is not None:
3636
max_length_arg = min(-max_length_arg, max_length_arg)
3737

38-
if max_length is not None:
39-
max_length = min(-max_length, max_length)
40-
41-
name = '_'.join([utils.slugify(argvalue)[max_length_arg:] for argvalue in command.split()])
42-
return name[max_length:]
43-
44-
45-
def generate_name_from_arguments(arguments, max_length_arg=None, max_length=None, prefix=datetime.now().strftime('%Y-%m-%d_%H-%M-%S_')):
46-
''' Generates name from given unfolded arguments.
47-
48-
Generate a name by concatenating the first and last values of every
49-
unfolded arguments and by trimming lengthty (as defined by max_length_arg)
50-
arguments.
51-
52-
Parameters
53-
----------
54-
arguments : list of list of str
55-
list of unfolded arguments
56-
max_length_arg : int
57-
arguments longer than this will be trimmed keeping last characters (Default: inf)
58-
max_length : int
59-
trim name if longer than this keeping last characters (Default: inf)
60-
prefix : str
61-
text to preprend to the name (Default: current datetime)
62-
63-
Returns
64-
-------
65-
name : str
66-
slugified name
67-
'''
68-
if max_length_arg is not None:
69-
max_length_arg = min(-max_length_arg, max_length_arg)
70-
71-
if max_length is not None:
72-
max_length = min(-max_length, max_length)
73-
74-
name = []
75-
for argvalues in arguments:
76-
argvalues = map(utils.slugify, argvalues)
77-
name.append(argvalues[0][max_length_arg:])
78-
if len(argvalues) > 1:
79-
name[-1] += '-' + argvalues[-1][max_length_arg:]
80-
81-
name = "_".join(name)
82-
83-
name = prefix + name[max_length:]
84-
return name
38+
name = t.strftime("%Y-%m-%d_%H-%M-%S_")
39+
name += '_'.join([utils.slugify(argvalue)[max_length_arg:] for argvalue in command.split()])
40+
return name[:max_length]
8541

8642

8743
def get_commands_from_file(fileobj):

smartdispatch/tests/test_command_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class CommandFilesTests(unittest.TestCase):
1111

1212
def setUp(self):
1313
self._base_dir = tmp.mkdtemp()
14+
self.nb_commands = 3
1415
self.command1 = "1\n"
1516
self.command2 = "2\n"
1617
self.command3 = "3\n"
@@ -75,7 +76,7 @@ def test_get_command_to_run(self):
7576
assert_true(not os.path.isfile(self.command_manager._finished_commands_filename))
7677

7778
def test_get_nb_commands_to_run(self):
78-
assert_equal(self.command_manager.get_nb_commands_to_run(), 3)
79+
assert_equal(self.command_manager.get_nb_commands_to_run(), self.nb_commands)
7980

8081
def test_set_running_command_as_finished(self):
8182
# SetUp

smartdispatch/tests/test_smartdispatch.py

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,66 +3,33 @@
33

44
from nose.tools import assert_true, assert_equal
55
from numpy.testing import assert_array_equal
6-
from datetime import datetime
76
from smartdispatch import utils
87

98

109
def test_generate_name_from_command():
10+
date_length = 20
11+
1112
command = "command arg1 arg2"
1213
expected = "_".join(command.split())
13-
assert_equal(smartdispatch.generate_name_from_command(command), expected)
14+
assert_equal(smartdispatch.generate_name_from_command(command)[date_length:], expected)
1415

1516
max_length_arg = 7
1617
long_arg = "veryverylongarg1"
1718
command = "command " + long_arg + " arg2"
1819
expected = command.split()
1920
expected[1] = long_arg[-max_length_arg:]
2021
expected = "_".join(expected)
21-
assert_equal(smartdispatch.generate_name_from_command(command, max_length_arg), expected)
22+
assert_equal(smartdispatch.generate_name_from_command(command, max_length_arg)[date_length:], expected)
2223

2324
max_length = 23
24-
long_arg = "veryverylongarg1"
2525
command = "command veryverylongarg1 veryverylongarg1 veryverylongarg1 veryverylongarg1"
26-
expected = command.split()
27-
expected = "_".join(expected)[-max_length:]
28-
assert_equal(smartdispatch.generate_name_from_command(command, max_length=max_length), expected)
26+
expected = command[:max_length].replace(" ", "_")
27+
assert_equal(smartdispatch.generate_name_from_command(command, max_length=max_length + date_length)[date_length:], expected)
2928

3029
# Test path arguments in command
3130
command = "command path/number/one path/number/two"
3231
expected = "command_pathnumberone_pathnumbertwo"
33-
assert_equal(smartdispatch.generate_name_from_command(command), expected)
34-
35-
36-
def test_generate_name_from_arguments():
37-
prefix = "prefix_"
38-
39-
arguments = [["my_command"], ["args1a", "args1b", "args1c"], ["args2a", "args2b"]]
40-
expected = prefix + "my_command_args1a-args1c_args2a-args2b"
41-
assert_equal(smartdispatch.generate_name_from_arguments(arguments, prefix=prefix), expected)
42-
43-
max_length_arg = 7
44-
arguments = [["command"], ["verylongargs1a", "verylongargs1b", "verylongargs1c"], ["args2a", "args2b"]]
45-
expected = prefix + "command_" + arguments[1][0][-max_length_arg:] + "-" + arguments[1][-1][-max_length_arg:] + "_args2a-args2b"
46-
assert_equal(smartdispatch.generate_name_from_arguments(arguments, max_length_arg, prefix=prefix), expected)
47-
48-
max_length = 23
49-
arguments = [["command"], ["verylongargs1a", "verylongargs1b", "verylongargs1c"], ["args2a", "args2b"]]
50-
expected = "command_" + arguments[1][0] + "-" + arguments[1][-1] + "_args2a-args2b"
51-
expected = prefix + expected[-max_length:]
52-
assert_equal(smartdispatch.generate_name_from_arguments(arguments, max_length=max_length, prefix=prefix), expected)
53-
54-
# Test path arguments in command
55-
arguments = [["command"], ["path/argument/1", "path/argument/2", "path/argument/3"]]
56-
expected = prefix + "command_pathargument1-pathargument3"
57-
assert_equal(smartdispatch.generate_name_from_arguments(arguments, prefix=prefix), expected)
58-
59-
# Make sure default prefix does not raise exception
60-
arguments = [["command"]]
61-
results = smartdispatch.generate_name_from_arguments(arguments)
62-
expect_datetime = datetime.now()
63-
assert_equal(results.split("_")[-1], arguments[0][0])
64-
result_datetime = datetime.strptime("_".join(results.split("_")[:-1]), '%Y-%m-%d_%H-%M-%S')
65-
assert_true(result_datetime <= expect_datetime)
32+
assert_equal(smartdispatch.generate_name_from_command(command)[date_length:], expected)
6633

6734

6835
def test_get_commands_from_file():
@@ -158,7 +125,7 @@ def test_replace_uid_tag():
158125

159126
commands = ["a command with a {UID} tag"] * 10
160127
uid = utils.generate_uid_from_string(commands[0])
161-
assert_array_equal(smartdispatch.replace_uid_tag(commands), [commands[0].replace("{UID}", uid)]*len(commands))
128+
assert_array_equal(smartdispatch.replace_uid_tag(commands), [commands[0].replace("{UID}", uid)] * len(commands))
162129

163130

164131
def test_get_available_queues():

tests/test_smart_worker.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import os
22
import unittest
33
import tempfile
4-
import fcntl
54
import time
65
import shutil
76

0 commit comments

Comments
 (0)