diff --git a/Modules/Cluster.py b/Modules/Cluster.py index 42c75c63..ede278b5 100644 --- a/Modules/Cluster.py +++ b/Modules/Cluster.py @@ -334,9 +334,49 @@ def __setattr__(self, name, value): + def copy_file(self, source, destination, server_source = False, server_dest = True, raise_error=False, **kwargs): + """ + COPY A FILE + =========== + + This function copies a file or directory from the source to the destination. + The destination is on the cluster, the source is in the local machine. + + It uses scp to perform the copy. + Alternative implementations can be performed by overloading this function. + args and kwargs are passed to the ExecuteCMD function. - def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster = False): + The result is the output of the ExecuteCMD function. + + Parameters + ---------- + source : string + The source file to be copied + destination : string + The destination file + server_source : bool + If true, the source is on the server + server_dest : bool + If true, the destination is on the server + raise_error : bool + If True, raises an error upon failure + """ + server_path = "%s:" % self.hostname + source_path = f"{source}" + dest_path = f"{destination}" + + if server_source: + source_path = server_path + source_path + if server_dest: + dest_path = server_path + dest_path + + cmd = self.scpcmd + f" {source_path} {dest_path}" + result = self.ExecuteCMD(cmd, raise_error = raise_error, **kwargs) + return result + + + def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster = False, use_active_shell = False): """ EXECUTE THE CMD ON THE CLUSTER ============================== @@ -355,6 +395,10 @@ def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster returned as second value. on_cluster : bool If true, the command is executed directly on the cluster through ssh + use_active_shell : bool + If true, the command is executed in a new shell on the cluster + This is usefull if the command is a script that must be executed + in a new shell, or if the command requires .bashrc to be sourced. Returns ------- @@ -367,6 +411,14 @@ def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster if on_cluster: cmd = self.sshcmd + " {} '{}'".format(self.hostname, cmd) + if use_active_shell: + cmd = "{ssh} {host} -t '{shell} --login -c \"{command}\"'".format(ssh = self.sshcmd, + host = self.hostname, + command = parse_symbols(cmd), + shell = self.terminal) + + + success = False @@ -436,9 +488,9 @@ def CheckCommunication(self): false otherwise. """ - cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname - - status, output = self.ExecuteCMD(cmd, return_output = True) + #cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname + cmd = "echo ciao" + status, output = self.ExecuteCMD(cmd, return_output = True, on_cluster = True) # print cmd # p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) @@ -630,7 +682,7 @@ def prepare_input_file(self, structures, calc, labels): Error message: '''.format(label) - MSG += str(e) + MSG += str(repr(e)) print(MSG) @@ -710,8 +762,7 @@ def copy_files(self, list_of_input, list_of_output, to_server): # Clean eventually input/output file of this very same calculation - cmd = self.sshcmd + " %s '%s'" % (self.hostname, rm_cmd) - self.ExecuteCMD(cmd, False) + self.ExecuteCMD(rm_cmd, False, on_cluster = True) # cp_res = os.system(cmd + " > /dev/null") # if cp_res != 0: # print "Error while executing:", cmd @@ -720,8 +771,9 @@ def copy_files(self, list_of_input, list_of_output, to_server): # # Copy the file into the cluster - cmd = self.scpcmd + " %s %s:%s/" % (tar_file, self.hostname, self.workdir) - cp_res = self.ExecuteCMD(cmd, False) + cp_res = self.copy_file(tar_file, self.workdir, raise_error=False) + #cmd = self.scpcmd + " %s %s:%s/" % (tar_file, self.hostname, self.workdir) + #cp_res = self.ExecuteCMD(cmd, False) if not cp_res: print ("Error while executing:", cmd) print ("Return code:", cp_res) @@ -734,8 +786,9 @@ def copy_files(self, list_of_input, list_of_output, to_server): # Unpack the input files and remove the archive decompress = 'cd {}; tar xf {};'.format(self.workdir, tar_name) - cmd = self.sshcmd + " %s '%s'" % (self.hostname, decompress) - cp_res = self.ExecuteCMD(cmd, False) + #cmd = self.sshcmd + " %s '%s'" % (self.hostname, decompress) + #cp_res = self.ExecuteCMD(cmd, False) + cp_res = self.ExecuteCMD(decompress, False, on_cluster = True) if not cp_res: print ("Error while executing:", cmd) print ("Return code:", cp_res) @@ -751,16 +804,18 @@ def copy_files(self, list_of_input, list_of_output, to_server): compress_cmd = 'cd {}; {}'.format(self.workdir, tar_command) - cmd = self.sshcmd + " %s '%s'" % (self.hostname, compress_cmd) - cp_res = self.ExecuteCMD(cmd, False) + # cmd = self.sshcmd + " %s '%s'" % (self.hostname, compress_cmd) + # cp_res = self.ExecuteCMD(cmd, False) + cp_res = self.ExecuteCMD(compress_cmd, False, on_cluster = True) if not cp_res: print ("Error while compressing the outputs:", cmd, list_of_output, "\nReturn code:", cp_res) #return cp_res - # Copy the tar and unpack - cmd = self.scpcmd + "%s:%s %s/" % (self.hostname, os.path.join(self.workdir, tar_name), self.local_workdir) - cp_res = self.ExecuteCMD(cmd, False) + # Copy the tar from the server to the local and unpack + # cmd = self.scpcmd + "%s:%s %s/" % (self.hostname, os.path.join(self.workdir, tar_name), self.local_workdir) + # cp_res = self.ExecuteCMD(cmd, False) + cp_res = self.copy_file(os.path.join(self.workdir, tar_name), self.local_workdir, raise_error=False, server_source=True, server_dest=False) if not cp_res: print ("Error while executing:", cmd) print ("Return code:", cp_res) @@ -805,20 +860,26 @@ def submit(self, script_location): It is what returned from self.ExecuteCMD(cmd, False) """ - cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname, - submit_cmd = self.submit_command, script = script_location) - if self.use_active_shell: - cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd, - host = self.hostname, - submit_cmd = self.submit_command, script = script_location, - shell = self.terminal) + cmd = f"{self.submit_command} {script_location}" + #cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname, + # submit_cmd = self.submit_command, script = script_location) + + result, output = self.ExecuteCMD(cmd, False, return_output=True, on_cluster = True, + use_active_shell = self.use_active_shell) + + + # if self.use_active_shell: + # cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd, + # host = self.hostname, + # submit_cmd = self.submit_command, script = script_location, + # shell = self.terminal) #cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, # self.workdir, label+ "_" + str(indices[0])) - return self.ExecuteCMD(cmd, False, return_output=True) + return result, output def get_output_path(self, label): """ @@ -1114,10 +1175,9 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP", submission += mpicmd + " " + binary + "\n" # First of all clean eventually input/output file of this very same calculation - cmd = self.sshcmd + " %s 'rm -f %s/%s%s %s/%s%s'" % (self.hostname, - self.workdir, label, in_extension, - self.workdir, label, out_extension) - self.ExecuteCMD(cmd, False) + cmd = "rm -f %s/%s%s %s/%s%s" % (self.workdir, label, in_extension, + self.workdir, label, out_extension) + self.ExecuteCMD(cmd, False, on_cluster = True) # cp_res = os.system(cmd) # if cp_res != 0: # print "Error while executing:", cmd @@ -1128,16 +1188,18 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP", f = open("%s/%s.sh" % (self.local_workdir, label), "w") f.write(submission) f.close() - cmd = self.scpcmd + " %s/%s.sh %s:%s" % (self.local_workdir, label, self.hostname, self.workdir) - self.ExecuteCMD(cmd, False) + #cmd = self.scpcmd + " %s/%s.sh %s:%s" % (self.local_workdir, label, self.hostname, self.workdir) + self.copy_file("%s/%s.sh" % (self.local_workdir, label), self.workdir, server_source=False, server_dest=True) + #self.ExecuteCMD(cmd, False) # cp_res = os.system(cmd) # if cp_res != 0: # print "Error while executing:", cmd # print "Return code:", cp_res # sys.stderr.write(cmd + ": exit with code " + str(cp_res)) # - cmd = self.scpcmd + " %s/%s%s %s:%s" % (self.local_workdir, label, in_extension, self.hostname, self.workdir) - cp_res = self.ExecuteCMD(cmd, False) + cp_res = self.copy_file("%s/%s%s" % (self.local_workdir, label, in_extension), self.workdir, server_source=False, server_dest=True, raise_error=False) + #cmd = self.scpcmd + " %s/%s%s %s:%s" % (self.local_workdir, label, in_extension, self.hostname, self.workdir) + #cp_res = self.ExecuteCMD(cmd, False) #cp_res = os.system(cmd) if not cp_res: #print "Error while executing:", cmd @@ -1146,8 +1208,9 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP", return # Run the simulation - cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, self.workdir, label) - self.ExecuteCMD(cmd, False) + cmd = "%s %s/%s.sh" % (self.submit_command, self.workdir, label) + #cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, self.workdir, label) + self.ExecuteCMD(cmd, False, on_cluster = True) # cp_res = os.system(cmd) # if cp_res != 0: # print "Error while executing:", cmd @@ -1155,9 +1218,10 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP", # sys.stderr.write(cmd + ": exit with code " + str(cp_res)) # Get the response - cmd = self.scpcmd + " %s:%s/%s%s %s/" % (self.hostname, self.workdir, label, out_extension, - self.local_workdir) - cp_res = self.ExecuteCMD(cmd, False) + #cmd = self.scpcmd + " %s:%s/%s%s %s/" % (self.hostname, self.workdir, label, out_extension, + #self.local_workdir) + #cp_res = self.ExecuteCMD(cmd, False) + cp_res = self.copy_file("%s/%s%s" % (self.workdir, label, out_extension), self.local_workdir, server_source=True, server_dest=False) #cp_res = os.system(cmd) if not cp_res: print ("Error while executing:", cmd) @@ -1401,10 +1465,11 @@ def setup_workdir(self, verbose = True): """ workdir = self.parse_string(self.workdir) - sshcmd = self.sshcmd + " %s 'mkdir -p %s'" % (self.hostname, - workdir) + cmd = "mkdir -p %s" % workdir + # sshcmd = self.sshcmd + " %s 'mkdir -p %s'" % (self.hostname, + # workdir) - self.ExecuteCMD(sshcmd, raise_error= True) + self.ExecuteCMD(cmd, raise_error= True, on_cluster=True) # # retval = os.system(sshcmd) # if retval != 0: @@ -1441,18 +1506,17 @@ def parse_string(self, string): # Open a pipe with the server # Use single ' to avoid string parsing by the local terminal - cmd = "%s %s 'echo \"%s\"'" % (self.sshcmd, self.hostname, string) + #cmd = "%s %s 'echo \"%s\"'" % (self.sshcmd, self.hostname, string) + cmd = f"echo \"{string}\"" - if self.use_active_shell_for_parsing: - cmd = "{ssh} {host} -t '{shell} --login -c \"echo {string}\"'".format(ssh = self.sshcmd, - host = self.hostname, - string = parse_symbols(string), - shell = self.terminal) - #print cmd + status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True, use_active_shell = self.use_active_shell_for_parsing, on_cluster = True) + + #print cmd #print(cmd) - status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True) + #status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True) + # # p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) # output, err = p.communicate() @@ -1621,73 +1685,3 @@ def compute_ensemble(self, ensemble, ase_calc, get_stress = True, timeout=None): self.compute_ensemble_batch(ensemble, ase_calc, get_stress, timeout) return - """ - # Track the remaining configurations - success = [False] * ensemble.N - - # Setup if the ensemble has the stress - ensemble.has_stress = get_stress - - # Check if the working directory exists - if not os.path.isdir(self.local_workdir): - os.makedirs(self.local_workdir) - - # Prepare the function for the simultaneous submission - def compute_single(num, calc): - atm = ensemble.structures[num].get_ase_atoms() - res = self.run_atoms(calc, atm, self.label + str(num), - n_nodes = self.n_nodes, - n_cpu=self.n_cpu, - npool = self.n_pool) - if res: - ensemble.energies[num] = res["energy"] / units["Ry"] - ensemble.forces[num, :, :] = res["forces"] / units["Ry"] - if get_stress: - stress = np.zeros((3,3), dtype = np.float64) - stress[0,0] = res["stress"][0] - stress[1,1] = res["stress"][1] - stress[2,2] = res["stress"][2] - stress[1,2] = res["stress"][3] - stress[2,1] = res["stress"][3] - stress[0,2] = res["stress"][4] - stress[2,0] = res["stress"][4] - stress[0,1] = res["stress"][5] - stress[1,0] = res["stress"][5] - # Remember, ase has a very strange definition of the stress - ensemble.stresses[num, :, :] = -stress * units["Bohr"]**3 / units["Ry"] - success[num] = True - - # Get the expected number of batch - num_batch_offset = int(ensemble.N / self.batch_size) - - # Run until some work has not finished - recalc = 0 - while np.sum(np.array(success, dtype = int) - 1) != 0: - threads = [] - - # Get the remaining jobs - false_mask = np.array(success) == False - false_id = np.arange(ensemble.N)[false_mask] - - count = 0 - # Submit in parallel - for i in false_id: - # Submit only the batch size - if count >= self.batch_size: - break - t = threading.Thread(target = compute_single, args=(i, ase_calc, )) - t.start() - threads.append(t) - count += 1 - - - # Wait until all the job have finished - for t in threads: - t.join(timeout) - - recalc += 1 - if recalc > num_batch_offset + self.max_recalc: - print ("Expected batch ordinary resubmissions:", num_batch_offset) - raise ValueError("Error, resubmissions exceeded the maximum number of %d" % self.max_recalc) - break - """ diff --git a/Modules/LocalCluster.py b/Modules/LocalCluster.py new file mode 100644 index 00000000..e4688bce --- /dev/null +++ b/Modules/LocalCluster.py @@ -0,0 +1,25 @@ +import sscha.Cluster as Cluster +import sys, os + +""" +Define a local cluster class. +This allows to mock the cluster class and run the code locally, but by +using the same interface as the cluster class and a job scheduler like SLURM. +""" + + +class LocalCluster(Cluster.Cluster): + def ExecuteCMD(self, cmd, *args, on_cluster = False, **kwargs): + """ + Execute a command in the local machine. + """ + + # Override the value of on_cluster + return super().ExecuteCMD(cmd, *args, on_cluster = False, **kwargs) + + def copy_file(self, source, destination, server_source = False, server_dest = False, **kwargs): + """ + Copy the files ignoring if the cluster is used. + """ + + return super().copy_file(source, destination, server_source = False, server_dest = False, **kwargs)