Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 112 additions & 118 deletions Modules/Cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,49 @@ def __setattr__(self, name, value):



def copy_file(self, source, destination, server_source = False, server_dest = True, raise_error=False, **kwargs):
"""
COPY A FILE
===========

This function copies a file or directory from the source to the destination.
The destination is on the cluster, the source is in the local machine.

It uses scp to perform the copy.
Alternative implementations can be performed by overloading this function.

args and kwargs are passed to the ExecuteCMD function.

def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster = False):
The result is the output of the ExecuteCMD function.

Parameters
----------
source : string
The source file to be copied
destination : string
The destination file
server_source : bool
If true, the source is on the server
server_dest : bool
If true, the destination is on the server
raise_error : bool
If True, raises an error upon failure
"""
server_path = "%s:" % self.hostname
source_path = f"{source}"
dest_path = f"{destination}"

if server_source:
source_path = server_path + source_path
if server_dest:
dest_path = server_path + dest_path

cmd = self.scpcmd + f" {source_path} {dest_path}"
result = self.ExecuteCMD(cmd, raise_error = raise_error, **kwargs)
return result


def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster = False, use_active_shell = False):
"""
EXECUTE THE CMD ON THE CLUSTER
==============================
Expand All @@ -355,6 +395,10 @@ def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster
returned as second value.
on_cluster : bool
If true, the command is executed directly on the cluster through ssh
use_active_shell : bool
If true, the command is executed in a new shell on the cluster
This is usefull if the command is a script that must be executed
in a new shell, or if the command requires .bashrc to be sourced.

Returns
-------
Expand All @@ -367,6 +411,14 @@ def ExecuteCMD(self, cmd, raise_error = False, return_output = False, on_cluster

if on_cluster:
cmd = self.sshcmd + " {} '{}'".format(self.hostname, cmd)
if use_active_shell:
cmd = "{ssh} {host} -t '{shell} --login -c \"{command}\"'".format(ssh = self.sshcmd,
host = self.hostname,
command = parse_symbols(cmd),
shell = self.terminal)





success = False
Expand Down Expand Up @@ -436,9 +488,9 @@ def CheckCommunication(self):
false otherwise.
"""

cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname

status, output = self.ExecuteCMD(cmd, return_output = True)
#cmd = self.sshcmd + " %s 'echo ciao'" % self.hostname
cmd = "echo ciao"
status, output = self.ExecuteCMD(cmd, return_output = True, on_cluster = True)

# print cmd
# p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
Expand Down Expand Up @@ -630,7 +682,7 @@ def prepare_input_file(self, structures, calc, labels):

Error message:
'''.format(label)
MSG += str(e)
MSG += str(repr(e))
print(MSG)


Expand Down Expand Up @@ -710,8 +762,7 @@ def copy_files(self, list_of_input, list_of_output, to_server):


# Clean eventually input/output file of this very same calculation
cmd = self.sshcmd + " %s '%s'" % (self.hostname, rm_cmd)
self.ExecuteCMD(cmd, False)
self.ExecuteCMD(rm_cmd, False, on_cluster = True)
# cp_res = os.system(cmd + " > /dev/null")
# if cp_res != 0:
# print "Error while executing:", cmd
Expand All @@ -720,8 +771,9 @@ def copy_files(self, list_of_input, list_of_output, to_server):
#

# Copy the file into the cluster
cmd = self.scpcmd + " %s %s:%s/" % (tar_file, self.hostname, self.workdir)
cp_res = self.ExecuteCMD(cmd, False)
cp_res = self.copy_file(tar_file, self.workdir, raise_error=False)
#cmd = self.scpcmd + " %s %s:%s/" % (tar_file, self.hostname, self.workdir)
#cp_res = self.ExecuteCMD(cmd, False)
if not cp_res:
print ("Error while executing:", cmd)
print ("Return code:", cp_res)
Expand All @@ -734,8 +786,9 @@ def copy_files(self, list_of_input, list_of_output, to_server):

# Unpack the input files and remove the archive
decompress = 'cd {}; tar xf {};'.format(self.workdir, tar_name)
cmd = self.sshcmd + " %s '%s'" % (self.hostname, decompress)
cp_res = self.ExecuteCMD(cmd, False)
#cmd = self.sshcmd + " %s '%s'" % (self.hostname, decompress)
#cp_res = self.ExecuteCMD(cmd, False)
cp_res = self.ExecuteCMD(decompress, False, on_cluster = True)
if not cp_res:
print ("Error while executing:", cmd)
print ("Return code:", cp_res)
Expand All @@ -751,16 +804,18 @@ def copy_files(self, list_of_input, list_of_output, to_server):

compress_cmd = 'cd {}; {}'.format(self.workdir, tar_command)

cmd = self.sshcmd + " %s '%s'" % (self.hostname, compress_cmd)
cp_res = self.ExecuteCMD(cmd, False)
# cmd = self.sshcmd + " %s '%s'" % (self.hostname, compress_cmd)
# cp_res = self.ExecuteCMD(cmd, False)
cp_res = self.ExecuteCMD(compress_cmd, False, on_cluster = True)
if not cp_res:
print ("Error while compressing the outputs:", cmd, list_of_output, "\nReturn code:", cp_res)
#return cp_res


# Copy the tar and unpack
cmd = self.scpcmd + "%s:%s %s/" % (self.hostname, os.path.join(self.workdir, tar_name), self.local_workdir)
cp_res = self.ExecuteCMD(cmd, False)
# Copy the tar from the server to the local and unpack
# cmd = self.scpcmd + "%s:%s %s/" % (self.hostname, os.path.join(self.workdir, tar_name), self.local_workdir)
# cp_res = self.ExecuteCMD(cmd, False)
cp_res = self.copy_file(os.path.join(self.workdir, tar_name), self.local_workdir, raise_error=False, server_source=True, server_dest=False)
if not cp_res:
print ("Error while executing:", cmd)
print ("Return code:", cp_res)
Expand Down Expand Up @@ -805,20 +860,26 @@ def submit(self, script_location):
It is what returned from self.ExecuteCMD(cmd, False)
"""

cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname,
submit_cmd = self.submit_command, script = script_location)
if self.use_active_shell:
cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd,
host = self.hostname,
submit_cmd = self.submit_command, script = script_location,
shell = self.terminal)
cmd = f"{self.submit_command} {script_location}"
#cmd = "{ssh} {host} '{submit_cmd} {script}'".format(ssh = self.sshcmd, host = self.hostname,
# submit_cmd = self.submit_command, script = script_location)

result, output = self.ExecuteCMD(cmd, False, return_output=True, on_cluster = True,
use_active_shell = self.use_active_shell)


# if self.use_active_shell:
# cmd = "{ssh} {host} -t '{shell} --login -c \"{submit_cmd} {script}\"'".format(ssh = self.sshcmd,
# host = self.hostname,
# submit_cmd = self.submit_command, script = script_location,
# shell = self.terminal)



#cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command,
# self.workdir, label+ "_" + str(indices[0]))

return self.ExecuteCMD(cmd, False, return_output=True)
return result, output

def get_output_path(self, label):
"""
Expand Down Expand Up @@ -1114,10 +1175,9 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP",
submission += mpicmd + " " + binary + "\n"

# First of all clean eventually input/output file of this very same calculation
cmd = self.sshcmd + " %s 'rm -f %s/%s%s %s/%s%s'" % (self.hostname,
self.workdir, label, in_extension,
self.workdir, label, out_extension)
self.ExecuteCMD(cmd, False)
cmd = "rm -f %s/%s%s %s/%s%s" % (self.workdir, label, in_extension,
self.workdir, label, out_extension)
self.ExecuteCMD(cmd, False, on_cluster = True)
# cp_res = os.system(cmd)
# if cp_res != 0:
# print "Error while executing:", cmd
Expand All @@ -1128,16 +1188,18 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP",
f = open("%s/%s.sh" % (self.local_workdir, label), "w")
f.write(submission)
f.close()
cmd = self.scpcmd + " %s/%s.sh %s:%s" % (self.local_workdir, label, self.hostname, self.workdir)
self.ExecuteCMD(cmd, False)
#cmd = self.scpcmd + " %s/%s.sh %s:%s" % (self.local_workdir, label, self.hostname, self.workdir)
self.copy_file("%s/%s.sh" % (self.local_workdir, label), self.workdir, server_source=False, server_dest=True)
#self.ExecuteCMD(cmd, False)
# cp_res = os.system(cmd)
# if cp_res != 0:
# print "Error while executing:", cmd
# print "Return code:", cp_res
# sys.stderr.write(cmd + ": exit with code " + str(cp_res))
#
cmd = self.scpcmd + " %s/%s%s %s:%s" % (self.local_workdir, label, in_extension, self.hostname, self.workdir)
cp_res = self.ExecuteCMD(cmd, False)
cp_res = self.copy_file("%s/%s%s" % (self.local_workdir, label, in_extension), self.workdir, server_source=False, server_dest=True, raise_error=False)
#cmd = self.scpcmd + " %s/%s%s %s:%s" % (self.local_workdir, label, in_extension, self.hostname, self.workdir)
#cp_res = self.ExecuteCMD(cmd, False)
#cp_res = os.system(cmd)
if not cp_res:
#print "Error while executing:", cmd
Expand All @@ -1146,18 +1208,20 @@ def run_atoms(self, ase_calc, ase_atoms, label="ESP",
return

# Run the simulation
cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, self.workdir, label)
self.ExecuteCMD(cmd, False)
cmd = "%s %s/%s.sh" % (self.submit_command, self.workdir, label)
#cmd = self.sshcmd + " %s '%s %s/%s.sh'" % (self.hostname, self.submit_command, self.workdir, label)
self.ExecuteCMD(cmd, False, on_cluster = True)
# cp_res = os.system(cmd)
# if cp_res != 0:
# print "Error while executing:", cmd
# print "Return code:", cp_res
# sys.stderr.write(cmd + ": exit with code " + str(cp_res))

# Get the response
cmd = self.scpcmd + " %s:%s/%s%s %s/" % (self.hostname, self.workdir, label, out_extension,
self.local_workdir)
cp_res = self.ExecuteCMD(cmd, False)
#cmd = self.scpcmd + " %s:%s/%s%s %s/" % (self.hostname, self.workdir, label, out_extension,
#self.local_workdir)
#cp_res = self.ExecuteCMD(cmd, False)
cp_res = self.copy_file("%s/%s%s" % (self.workdir, label, out_extension), self.local_workdir, server_source=True, server_dest=False)
#cp_res = os.system(cmd)
if not cp_res:
print ("Error while executing:", cmd)
Expand Down Expand Up @@ -1401,10 +1465,11 @@ def setup_workdir(self, verbose = True):
"""
workdir = self.parse_string(self.workdir)

sshcmd = self.sshcmd + " %s 'mkdir -p %s'" % (self.hostname,
workdir)
cmd = "mkdir -p %s" % workdir
# sshcmd = self.sshcmd + " %s 'mkdir -p %s'" % (self.hostname,
# workdir)

self.ExecuteCMD(sshcmd, raise_error= True)
self.ExecuteCMD(cmd, raise_error= True, on_cluster=True)
#
# retval = os.system(sshcmd)
# if retval != 0:
Expand Down Expand Up @@ -1441,18 +1506,17 @@ def parse_string(self, string):

# Open a pipe with the server
# Use single ' to avoid string parsing by the local terminal
cmd = "%s %s 'echo \"%s\"'" % (self.sshcmd, self.hostname, string)
#cmd = "%s %s 'echo \"%s\"'" % (self.sshcmd, self.hostname, string)
cmd = f"echo \"{string}\""

if self.use_active_shell_for_parsing:
cmd = "{ssh} {host} -t '{shell} --login -c \"echo {string}\"'".format(ssh = self.sshcmd,
host = self.hostname,
string = parse_symbols(string),
shell = self.terminal)
#print cmd
status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True, use_active_shell = self.use_active_shell_for_parsing, on_cluster = True)

#print cmd

#print(cmd)

status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True)
#status, output = self.ExecuteCMD(cmd, return_output = True, raise_error= True)

#
# p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
# output, err = p.communicate()
Expand Down Expand Up @@ -1621,73 +1685,3 @@ def compute_ensemble(self, ensemble, ase_calc, get_stress = True, timeout=None):
self.compute_ensemble_batch(ensemble, ase_calc, get_stress, timeout)
return

"""
# Track the remaining configurations
success = [False] * ensemble.N

# Setup if the ensemble has the stress
ensemble.has_stress = get_stress

# Check if the working directory exists
if not os.path.isdir(self.local_workdir):
os.makedirs(self.local_workdir)

# Prepare the function for the simultaneous submission
def compute_single(num, calc):
atm = ensemble.structures[num].get_ase_atoms()
res = self.run_atoms(calc, atm, self.label + str(num),
n_nodes = self.n_nodes,
n_cpu=self.n_cpu,
npool = self.n_pool)
if res:
ensemble.energies[num] = res["energy"] / units["Ry"]
ensemble.forces[num, :, :] = res["forces"] / units["Ry"]
if get_stress:
stress = np.zeros((3,3), dtype = np.float64)
stress[0,0] = res["stress"][0]
stress[1,1] = res["stress"][1]
stress[2,2] = res["stress"][2]
stress[1,2] = res["stress"][3]
stress[2,1] = res["stress"][3]
stress[0,2] = res["stress"][4]
stress[2,0] = res["stress"][4]
stress[0,1] = res["stress"][5]
stress[1,0] = res["stress"][5]
# Remember, ase has a very strange definition of the stress
ensemble.stresses[num, :, :] = -stress * units["Bohr"]**3 / units["Ry"]
success[num] = True

# Get the expected number of batch
num_batch_offset = int(ensemble.N / self.batch_size)

# Run until some work has not finished
recalc = 0
while np.sum(np.array(success, dtype = int) - 1) != 0:
threads = []

# Get the remaining jobs
false_mask = np.array(success) == False
false_id = np.arange(ensemble.N)[false_mask]

count = 0
# Submit in parallel
for i in false_id:
# Submit only the batch size
if count >= self.batch_size:
break
t = threading.Thread(target = compute_single, args=(i, ase_calc, ))
t.start()
threads.append(t)
count += 1


# Wait until all the job have finished
for t in threads:
t.join(timeout)

recalc += 1
if recalc > num_batch_offset + self.max_recalc:
print ("Expected batch ordinary resubmissions:", num_batch_offset)
raise ValueError("Error, resubmissions exceeded the maximum number of %d" % self.max_recalc)
break
"""
25 changes: 25 additions & 0 deletions Modules/LocalCluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import sscha.Cluster as Cluster
import sys, os

"""
Define a local cluster class.
This allows to mock the cluster class and run the code locally, but by
using the same interface as the cluster class and a job scheduler like SLURM.
"""


class LocalCluster(Cluster.Cluster):
def ExecuteCMD(self, cmd, *args, on_cluster = False, **kwargs):
"""
Execute a command in the local machine.
"""

# Override the value of on_cluster
return super().ExecuteCMD(cmd, *args, on_cluster = False, **kwargs)

def copy_file(self, source, destination, server_source = False, server_dest = False, **kwargs):
"""
Copy the files ignoring if the cluster is used.
"""

return super().copy_file(source, destination, server_source = False, server_dest = False, **kwargs)