diff --git a/scripts/smart-dispatch b/scripts/smart-dispatch index 86904fa..69d243e 100755 --- a/scripts/smart-dispatch +++ b/scripts/smart-dispatch @@ -146,18 +146,22 @@ def main(): log_folder=path_job_logs, worker_call_suffix=worker_call_suffix) commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)] - # TODO: use args.memPerNode instead of args.memPerNode - queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, float('inf'), args.modules) + queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, args.memPerNode, args.modules) - # Check that requested core number does not exceed node total + # Check that requested per command resources do not exceed node total if args.coresPerCommand > queue.nb_cores_per_node: sys.stderr.write("smart-dispatch: error: coresPerCommand exceeds nodes total: asked {req_cores} cores, nodes have {node_cores}\n" .format(req_cores=args.coresPerCommand, node_cores=queue.nb_cores_per_node)) sys.exit(2) + if args.memPerCommand > queue.mem_per_node: + sys.stderr.write("smart-dispatch: error: memPerCommand exceeds nodes total: asked {req_mem} Gb, nodes have {node_mem} Gb\n" + .format(req_mem=args.memPerCommand, node_mem=queue.mem_per_node)) + sys.exit(2) + command_params = {'nb_cores_per_command': args.coresPerCommand, 'nb_gpus_per_command': args.gpusPerCommand, - 'mem_per_command': None # args.memPerCommand + 'mem_per_command': args.memPerCommand } prolog = [] @@ -172,7 +176,7 @@ def main(): for pbs_id, pbs in enumerate(job_generator.pbs_list): proper_size_name = utils.jobname_generator(jobname, pbs_id) pbs.add_options(N=proper_size_name) - + if args.pbsFlags is not None: job_generator.add_pbs_flags(args.pbsFlags.split(' ')) pbs_filenames = job_generator.write_pbs_files(path_job_commands) @@ -193,11 +197,11 @@ def parse_arguments(): parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub') parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.') parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.') - # parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).') + parser.add_argument('-M', '--memPerNode', type=float, required=False, help='How much memory there are per node (in Gb).') parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1) parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1) - # parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).') + parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).') parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)') parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+') @@ -224,7 +228,9 @@ def parse_arguments(): if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None): parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.") if args.coresPerCommand < 1: - parser.error("coresPerNode must be at least 1") + parser.error("coresPerCommand must be at least 1") + if args.memPerCommand is not None and args.memPerCommand <= 0: + parser.error("memPerCommand must be positive") return args diff --git a/smartdispatch/job_generator.py b/smartdispatch/job_generator.py index d2db23c..743a54d 100644 --- a/smartdispatch/job_generator.py +++ b/smartdispatch/job_generator.py @@ -46,7 +46,7 @@ def __init__(self, queue, commands, prolog=[], epilog=[], command_params={}, bas self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1) self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1) - #self.mem_per_command = command_params.get('mem_per_command', 0.0) + self.mem_per_command = command_params.get('mem_per_command', None) self.pbs_list = self._generate_base_pbs() self._add_cluster_specific_rules() @@ -80,6 +80,10 @@ def _generate_base_pbs(self): if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0: nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command) + # Limit number of running commands by the amount of available memory on the node. + if self.mem_per_command is not None: + nb_commands_per_node = min(nb_commands_per_node, self.queue.mem_per_node // self.mem_per_command) + pbs_files = [] # Distribute equally the jobs among the PBS files and generate those files for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)): @@ -92,9 +96,12 @@ def _generate_base_pbs(self): resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command) if self.queue.nb_gpus_per_node > 0: resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command) - pbs.add_resources(nodes=resource) + if self.mem_per_command is not None: + resource = "{mem}Gb".format(mem=len(commands) * self.mem_per_command) + pbs.add_resources(mem=resource) + pbs.add_modules_to_load(*self.queue.modules) pbs.add_to_prolog(*self.prolog) pbs.add_commands(*commands) diff --git a/smartdispatch/tests/test_job_generator.py b/smartdispatch/tests/test_job_generator.py index 7214d9d..b60a5a5 100644 --- a/smartdispatch/tests/test_job_generator.py +++ b/smartdispatch/tests/test_job_generator.py @@ -43,7 +43,7 @@ def test_generate_pbs(self): assert_equal(job_generator.pbs_list[0].epilog, self.epilog) def test_generate_pbs2_cpu(self): - # Should needs two PBS file + # Should need two PBS files command_params = {'nb_cores_per_command': self.cores // 2} job_generator = JobGenerator(self.queue, self.commands, command_params=command_params) assert_equal(len(job_generator.pbs_list), 2) @@ -51,7 +51,7 @@ def test_generate_pbs2_cpu(self): assert_equal(job_generator.pbs_list[1].commands, self.commands[2:]) def test_generate_pbs4_cpu(self): - # Should needs four PBS file + # Should need four PBS files command_params = {'nb_cores_per_command': self.cores} job_generator = JobGenerator(self.queue, self.commands, command_params=command_params) assert_equal(len(job_generator.pbs_list), 4) @@ -64,9 +64,31 @@ def test_generate_pbs4_cpu(self): # Check if needed modules for this queue are included in the PBS file assert_equal(job_generator.pbs_list[0].modules, self.modules) + def test_generate_pbs2_mem(self): + # Should need two PBS files + command_params = {'mem_per_command': self.mem_per_node // 2} + job_generator = JobGenerator(self.queue, self.commands, command_params=command_params) + assert_equal(len(job_generator.pbs_list), 2) + assert_equal(job_generator.pbs_list[0].commands, self.commands[:2]) + assert_equal(job_generator.pbs_list[1].commands, self.commands[2:]) + + def test_generate_pbs4_mem(self): + # Should need four PBS files + command_params = {'mem_per_command': self.mem_per_node} + job_generator = JobGenerator(self.queue, self.commands, command_params=command_params) + assert_equal(len(job_generator.pbs_list), 4) + assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands) + + # Since queue has no gpus it should not be specified in PBS resource `nodes` + assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes']) + + # Test modules to load + # Check if needed modules for this queue are included in the PBS file + assert_equal(job_generator.pbs_list[0].modules, self.modules) + def test_generate_pbs2_gpu(self): # Test nb_gpus_per_command argument - # Should needs two PBS file + # Should need two PBS files command_params = {'nb_gpus_per_command': self.gpus // 2} job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params) assert_equal(len(job_generator.pbs_list), 2) @@ -74,7 +96,7 @@ def test_generate_pbs2_gpu(self): assert_equal(job_generator.pbs_list[1].commands, self.commands[2:]) def test_generate_pbs4_gpu(self): - # Should needs four PBS files + # Should need four PBS files command_params = {'nb_gpus_per_command': self.gpus} job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params) assert_equal(len(job_generator.pbs_list), 4) diff --git a/tests/test_smart_dispatch.py b/tests/test_smart_dispatch.py index 9e514c1..23dad2d 100644 --- a/tests/test_smart_dispatch.py +++ b/tests/test_smart_dispatch.py @@ -23,17 +23,20 @@ def setUp(self): self.nb_commands = len(self.commands) scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts")) - self.smart_dispatch_command = '{} -C 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch')) + self.smart_dispatch_command = '{} -C 1 -M 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch')) self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands) self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command) - smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) + smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -M 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands) self.nb_workers = 10 - smart_dispatch_command_with_cores = '{} -C 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) + smart_dispatch_command_with_cores = '{} -C 1 -M 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}') + smart_dispatch_command_with_memory = '{} -C 1 -M 1 -m {{memory}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch')) + self.launch_command_with_memory = smart_dispatch_command_with_memory.format('launch ' + self.folded_commands, memory='{memory}') + self._cwd = os.getcwd() os.chdir(self.testing_dir) @@ -95,6 +98,18 @@ def test_main_launch_with_cores_command(self): assert_equal(exit_status_100, 2) assert_true(os.path.isdir(self.logs_dir)) + def test_main_launch_with_memory_command(self): + # Actual test + exit_status_0 = call(self.launch_command_with_memory.format(memory=0), shell=True) + exit_status_05 = call(self.launch_command_with_memory.format(memory=0.5), shell=True) + exit_status_100 = call(self.launch_command_with_memory.format(memory=100), shell=True) + + # Test validation + assert_equal(exit_status_0, 2) + assert_equal(exit_status_05, 0) + assert_equal(exit_status_100, 2) + assert_true(os.path.isdir(self.logs_dir)) + def test_main_resume(self): # Setup call(self.launch_command, shell=True)