Skip to content

Commit 74470a5

Browse files
committed
Merge pull request #100 from MarcCote/smarter_lock_for_helios
Use the atomicity of folder creation to simulate a locking mechanism
2 parents 845a840 + cc0f243 commit 74470a5

File tree

9 files changed

+174
-63
lines changed

9 files changed

+174
-63
lines changed

scripts/smart_dispatch.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from smartdispatch.job_generator import job_generator_factory
1717
from smartdispatch import get_available_queues
1818
from smartdispatch import utils
19+
from smartdispatch.filelock import open_with_lock
1920

2021
import logging
2122
import smartdispatch
@@ -126,7 +127,7 @@ def main():
126127
qsub_output = check_output('{launcher} {pbs_filename}'.format(launcher=LAUNCHER if args.launcher is None else args.launcher, pbs_filename=pbs_filename), shell=True)
127128
jobs_id += [qsub_output.strip()]
128129

129-
with utils.open_with_lock(pjoin(path_job, "jobs_id.txt"), 'a') as jobs_id_file:
130+
with open_with_lock(pjoin(path_job, "jobs_id.txt"), 'a') as jobs_id_file:
130131
jobs_id_file.writelines(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
131132
jobs_id_file.writelines("\n".join(jobs_id) + "\n")
132133
print "\nJobs id:\n{jobs_id}".format(jobs_id=" ".join(jobs_id))

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
license='LICENSE.txt',
1313
description='A batch job launcher for the Mammouth supercomputer.',
1414
long_description=open('README.txt').read(),
15-
install_requires=[],
15+
install_requires=['psutil>=1'],
1616
package_data={'smartdispatch': ['config/*.json']}
1717
)

smartdispatch/command_manager.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
from smartdispatch import utils
2+
from .filelock import open_with_lock
33

44

55
class CommandManager(object):
@@ -24,13 +24,13 @@ def _move_line_between_files(self, file1, file2, line):
2424
file2.write(line)
2525

2626
def set_commands_to_run(self, commands):
27-
with utils.open_with_lock(self._commands_filename, 'a') as commands_file:
27+
with open_with_lock(self._commands_filename, 'a') as commands_file:
2828
commands = [command + '\n' for command in commands]
2929
commands_file.writelines(commands)
3030

3131
def get_command_to_run(self):
32-
with utils.open_with_lock(self._commands_filename, 'r+') as commands_file:
33-
with utils.open_with_lock(self._running_commands_filename, 'a') as running_commands_file:
32+
with open_with_lock(self._commands_filename, 'r+') as commands_file:
33+
with open_with_lock(self._running_commands_filename, 'a') as running_commands_file:
3434
command = commands_file.readline()
3535
if command == '':
3636
return None
@@ -54,14 +54,14 @@ def set_running_command_as_finished(self, command, error_code=0):
5454
else:
5555
file_name = self._failed_commands_filename
5656

57-
with utils.open_with_lock(self._running_commands_filename, 'r+') as running_commands_file:
58-
with utils.open_with_lock(file_name, 'a') as finished_commands_file:
57+
with open_with_lock(self._running_commands_filename, 'r+') as running_commands_file:
58+
with open_with_lock(file_name, 'a') as finished_commands_file:
5959
self._move_line_between_files(running_commands_file, finished_commands_file, command + '\n')
6060

6161
def reset_running_commands(self):
6262
if os.path.isfile(self._running_commands_filename):
63-
with utils.open_with_lock(self._commands_filename, 'r+') as commands_file:
64-
with utils.open_with_lock(self._running_commands_filename, 'r+') as running_commands_file:
63+
with open_with_lock(self._commands_filename, 'r+') as commands_file:
64+
with open_with_lock(self._running_commands_filename, 'r+') as running_commands_file:
6565
commands = running_commands_file.readlines()
6666
if len(commands) > 0:
6767
running_commands_file.seek(0, os.SEEK_SET)

smartdispatch/filelock.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import os
2+
import time
3+
import fcntl
4+
import psutil
5+
import logging
6+
7+
from contextlib import contextmanager
8+
9+
# Constants needed for `open_with_dirlock` function.
10+
MAX_ATTEMPTS = 1000 # This would correspond to be blocked for ~15min.
11+
TIME_BETWEEN_ATTEMPTS = 1 # In seconds
12+
13+
14+
def find_mount_point(path='.'):
15+
""" Finds the mount point used to access `path`. """
16+
path = os.path.abspath(path)
17+
while not os.path.ismount(path):
18+
path = os.path.dirname(path)
19+
20+
return path
21+
22+
23+
def get_fs(path='.'):
24+
""" Gets info about the filesystem on which `path` lives. """
25+
mount = find_mount_point(path)
26+
27+
for fs in psutil.disk_partitions(True):
28+
if fs.mountpoint == mount:
29+
return fs
30+
31+
32+
@contextmanager
33+
def open_with_flock(*args, **kwargs):
34+
""" Context manager for opening file with an exclusive lock. """
35+
f = open(*args, **kwargs)
36+
try:
37+
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
38+
except IOError:
39+
logging.info("Can't immediately write-lock the file ({0}), waiting ...".format(f.name))
40+
fcntl.lockf(f, fcntl.LOCK_EX)
41+
42+
yield f
43+
fcntl.lockf(f, fcntl.LOCK_UN)
44+
f.close()
45+
46+
47+
@contextmanager
48+
def open_with_dirlock(*args, **kwargs):
49+
""" Context manager for opening file with an exclusive lock using. """
50+
dirname = os.path.dirname(args[0])
51+
filename = os.path.basename(args[0])
52+
lockfile = os.path.join(dirname, "." + filename)
53+
54+
no_attempt = 0
55+
while no_attempt < MAX_ATTEMPTS:
56+
try:
57+
os.mkdir(lockfile) # Atomic operation
58+
f = open(*args, **kwargs)
59+
yield f
60+
f.close()
61+
os.rmdir(lockfile)
62+
break
63+
except OSError:
64+
logging.info("Can't immediately write-lock the file ({0}), retrying in {1} sec. ...".format(filename, TIME_BETWEEN_ATTEMPTS))
65+
time.sleep(TIME_BETWEEN_ATTEMPTS)
66+
no_attempt += 1
67+
68+
69+
def _fs_support_globalflock(fs):
70+
if fs.fstype == "lustre":
71+
return ("flock" in fs.opts) and "localflock" not in fs.opts
72+
73+
elif fs.fstype == "gpfs":
74+
return True
75+
76+
return False # We don't know.
77+
78+
79+
# Determine if we can rely on the fcntl module for locking files on the cluster.
80+
# Otherwise, fallback on using the directory creation atomicity as a locking mechanism.
81+
fs = get_fs('.')
82+
if _fs_support_globalflock(fs):
83+
open_with_lock = open_with_flock
84+
else:
85+
logging.warn("Cluster does not support flock! Falling back to folder lock.")
86+
open_with_lock = open_with_dirlock

smartdispatch/smartdispatch.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import smartdispatch
1010
from smartdispatch import utils
11+
from smartdispatch.filelock import open_with_lock
1112
from smartdispatch.argument_template import argument_templates
1213

1314
UID_TAG = "{UID}"
@@ -153,7 +154,7 @@ def log_command_line(path_job, command_line):
153154
we can paste the command line as-is in the terminal. This means that the quotes
154155
symbole " and the square brackets will be escaped.
155156
"""
156-
with utils.open_with_lock(pjoin(path_job, "command_line.log"), 'a') as command_line_log:
157+
with open_with_lock(pjoin(path_job, "command_line.log"), 'a') as command_line_log:
157158
command_line_log.write(t.strftime("## %Y-%m-%d %H:%M:%S ##\n"))
158159
command_line = command_line.replace('"', r'\"') # Make sure we can paste the command line as-is
159160
command_line = re.sub(r'(\[)([^\[\]]*\\ [^\[\]]*)(\])', r'"\1\2\3"', command_line) # Make sure we can paste the command line as-is
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import os
2+
import time
3+
import tempfile
4+
import shutil
5+
6+
from subprocess import Popen, PIPE
7+
from nose.tools import assert_equal, assert_true
8+
9+
from smartdispatch.filelock import open_with_lock, open_with_dirlock, open_with_flock
10+
from smartdispatch.filelock import find_mount_point, get_fs
11+
12+
13+
def _test_open_with_lock(lock_func):
14+
""" Test a particular file lock.
15+
16+
Notes
17+
-----
18+
This test only checks if the locking mechanism works on a single
19+
computer/compute node. There is *no* check for multi-node lock.
20+
"""
21+
temp_dir = tempfile.mkdtemp()
22+
filename = os.path.join(temp_dir, "testing.lck")
23+
24+
python_script = os.path.join(temp_dir, "test_lock.py")
25+
26+
script = ["import logging",
27+
"from smartdispatch.filelock import {}".format(lock_func.__name__),
28+
"logging.root.setLevel(logging.INFO)",
29+
"with {}('{}', 'r+'): pass".format(lock_func.__name__, filename)]
30+
31+
open(os.path.join(temp_dir, "test_lock.py"), 'w').write("\n".join(script))
32+
33+
command = "python " + python_script
34+
35+
# Lock the commands file before running python command
36+
with lock_func(filename, 'w'):
37+
process = Popen(command, stdout=PIPE, stderr=PIPE, shell=True)
38+
time.sleep(1)
39+
40+
stdout, stderr = process.communicate()
41+
assert_equal(stdout, "")
42+
assert_true("Traceback" not in stderr, msg="Unexpected error: '{}'".format(stderr))
43+
assert_true("write-lock" in stderr, msg="Forcing a race condition, try increasing sleeping time above.")
44+
45+
shutil.rmtree(temp_dir) # Cleaning up.
46+
47+
48+
def test_open_with_default_lock():
49+
_test_open_with_lock(open_with_lock)
50+
51+
52+
def test_open_with_dirlock():
53+
_test_open_with_lock(open_with_dirlock)
54+
55+
56+
def test_open_with_flock():
57+
_test_open_with_lock(open_with_flock)
58+
59+
60+
def test_find_mount_point():
61+
assert_equal(find_mount_point('/'), '/')
62+
63+
for d in os.listdir('/mnt'):
64+
path = os.path.join('/mnt', d)
65+
if os.path.ismount(path):
66+
assert_equal(find_mount_point(path), path)
67+
else:
68+
assert_equal(find_mount_point(path), '/')
69+
70+
71+
def test_get_fs():
72+
fs = get_fs('/')
73+
assert_true(fs is not None)

smartdispatch/tests/test_utils.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
# -*- coding: utf-8 -*-
2-
import os
3-
import time
4-
import tempfile
5-
import shutil
62
import unittest
73

8-
from subprocess import Popen, PIPE
9-
104
from smartdispatch import utils
115

126
from nose.tools import assert_equal, assert_true
@@ -51,31 +45,3 @@ def test_slugify():
5145

5246
for arg, expected in testing_arguments:
5347
assert_equal(utils.slugify(arg), expected)
54-
55-
56-
def test_open_with_lock():
57-
temp_dir = tempfile.mkdtemp()
58-
filename = os.path.join(temp_dir, "testing.lck")
59-
60-
python_script = os.path.join(temp_dir, "test_lock.py")
61-
62-
script = ["import logging",
63-
"from smartdispatch.utils import open_with_lock",
64-
"logging.root.setLevel(logging.INFO)",
65-
"with open_with_lock('{0}', 'r+'): pass".format(filename)]
66-
67-
open(os.path.join(temp_dir, "test_lock.py"), 'w').write("\n".join(script))
68-
69-
command = "python " + python_script
70-
71-
# Lock the commands file before running python command
72-
with utils.open_with_lock(filename, 'w'):
73-
process = Popen(command, stdout=PIPE, stderr=PIPE, shell=True)
74-
time.sleep(1)
75-
76-
stdout, stderr = process.communicate()
77-
assert_equal(stdout, "")
78-
assert_true("write-lock" in stderr, msg="Forcing a race condition, try increasing sleeping time above.")
79-
assert_true("Traceback" not in stderr, msg="Unexpected error: " + stderr) # Check that there are no errors.
80-
81-
shutil.rmtree(temp_dir)

smartdispatch/utils.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
import re
2-
import fcntl
3-
import logging
42
import hashlib
53
import unicodedata
64
import json
75

86
from distutils.util import strtobool
97
from subprocess import Popen, PIPE
10-
from contextlib import contextmanager
118

129

1310
def print_boxed(string):
@@ -81,20 +78,6 @@ def unhexify(match):
8178
return re.sub(r"\\x..", unhexify, text)
8279

8380

84-
@contextmanager
85-
def open_with_lock(*args, **kwargs):
86-
""" Context manager for opening file with an exclusive lock. """
87-
f = open(*args, **kwargs)
88-
try:
89-
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
90-
except IOError:
91-
logging.info("Can't immediately write-lock the file ({0}), blocking ...".format(f.name))
92-
fcntl.lockf(f, fcntl.LOCK_EX)
93-
yield f
94-
fcntl.lockf(f, fcntl.LOCK_UN)
95-
f.close()
96-
97-
9881
def save_dict_to_json_file(path, dictionary):
9982
with open(path, "w") as json_file:
10083
json_file.write(json.dumps(dictionary, indent=4, separators=(',', ': ')))

tests/test_smart_worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import shutil
66

77
from smartdispatch import utils
8+
from smartdispatch.filelock import open_with_lock
89
from smartdispatch.command_manager import CommandManager
910

1011
from subprocess import Popen, call, PIPE
@@ -106,7 +107,7 @@ def test_lock(self):
106107
command = ["smart_worker.py", self.command_manager._commands_filename, self.logs_dir]
107108

108109
# Lock the commands file before running 'smart_worker.py'
109-
with utils.open_with_lock(self.command_manager._commands_filename, 'r+'):
110+
with open_with_lock(self.command_manager._commands_filename, 'r+'):
110111
process = Popen(command, stdout=PIPE, stderr=PIPE)
111112
time.sleep(1)
112113

0 commit comments

Comments
 (0)