From 32eacb46225a5d8952088c1b8d5705dd0a252157 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Tue, 19 Nov 2019 18:06:06 +0200 Subject: [PATCH 01/16] talker.agent: run agent with python3 --- .travis.yml | 2 +- talker_agent/Dockerfile | 2 +- talker_agent/talker.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6e71156..9632b79 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ matrix: env: - TEST_SUITE="tests/uts/test_client.py" - stage: agent unit tests - python: 2.7 + python: 3.5 env: - TEST_SUITE="tests/uts/test_agent.py" - stage: integration tests diff --git a/talker_agent/Dockerfile b/talker_agent/Dockerfile index a93aa7e..027048c 100644 --- a/talker_agent/Dockerfile +++ b/talker_agent/Dockerfile @@ -1,4 +1,4 @@ -FROM python:2.7-slim-buster +FROM python:3.5-slim-buster #args from docker-compose.yaml ARG TALKER_AGENT_VERSION diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 580e1da..76eefdd 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/local/bin/python3 """ From 9fce18adb761dad8d013a06f88aa2808fac752dd Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Wed, 20 Nov 2019 12:17:19 +0200 Subject: [PATCH 02/16] talker.agent: encode stderr exception --- talker_agent/talker.py | 2 +- tests/integration/test_sanity.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 76eefdd..189376b 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -304,7 +304,7 @@ def start(self): time.sleep(random.random() * 2) continue - self.stderr.chunks.append(e.strerror) + self.stderr.chunks.append(e.strerror.encode('utf-8')) self.set_result(e.errno) self.finalize() return diff --git a/tests/integration/test_sanity.py b/tests/integration/test_sanity.py index 112939d..7378c2c 100644 --- a/tests/integration/test_sanity.py +++ b/tests/integration/test_sanity.py @@ -1,7 +1,7 @@ import unittest from talker_agent.talker import Config -from talker.errors import CommandAbortedByOverflow +from talker.errors import CommandAbortedByOverflow, CommandExecutionError from tests.utils import get_talker_client, get_retcode, get_stdout @@ -38,3 +38,8 @@ def test_max_output_per_channel_set(self): if expected_ret == 0: res = get_stdout(self.client.redis, cmd.job_id) self.assertEqual(res, val) + + def test_bad_command(self): + cmd = self.client.run(self.host_id, 'foo') + with self.assertRaises(CommandExecutionError): + cmd.result() From e8ffbba3c9b307f194c6a4b51281be5c8598addc Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Wed, 20 Nov 2019 12:19:39 +0200 Subject: [PATCH 03/16] changelog: update --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4b2702..e8d5ba0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- upgrade agent to run with python3 ## [1.8.6] - 2019-11-05 ### Added From a8f9eb480739776d2b41303ee70707973db392c1 Mon Sep 17 00:00:00 2001 From: Doron Cohen Date: Mon, 28 Oct 2019 17:34:55 +0200 Subject: [PATCH 04/16] agent: remove weka related code / comments --- talker_agent/talker.py | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 189376b..1c7f5f5 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -1,23 +1,5 @@ #!/usr/local/bin/python3 - -""" -Talker Agent (Server-Side) -========================== - -* Important: - - keep this free of dependencies (there's only a redis dependency) - - keep this compatible with python2.6+ (no dict comprehension) - -* Packaging: - - update the 'TALKER' version in version_info - - ./teka pack talker - -* Testing: - - See ./wepy/devops/talker.py (client-side) - -""" - import fcntl import json import logging @@ -313,13 +295,7 @@ def start(self): self.job_fn = "%s/job.%s.%s" % (JOBS_DIR, self.job_id, self.popen.pid) with open(self.job_fn, "w") as f: - try: - f.write(repr(self.cmd)) - except IOError: - # to help with WEKAPP-74054 - os.system("df") - os.system("df -i") - raise + f.write(repr(self.cmd)) self.agent.current_processes[self.job_id] = self for channel in self.channels: @@ -940,10 +916,6 @@ def main(*args): config = Config() set_logging_to_file(config.parser.get('logging', 'logpath')) - # to help with WEKAPP-74054 - os.system("df") - os.system("df -i") - open("/var/run/talker.pid", "w").write(str(os.getpid())) atexit.register(os.unlink, "/var/run/talker.pid") From a455c840b745abe5a5cc2591fcbd2c87ac48547d Mon Sep 17 00:00:00 2001 From: Doron Cohen Date: Mon, 28 Oct 2019 17:36:52 +0200 Subject: [PATCH 05/16] agent: remove old ut code --- talker_agent/talker.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 1c7f5f5..df05625 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -25,6 +25,8 @@ except: # python 2.7 from ConfigParser import ConfigParser +import redis + PY3 = sys.version_info[0] == 3 @@ -817,7 +819,6 @@ def setup(self): health_check_interval = config.parser.getfloat('redis', 'health_check_interval') logger.info("Connecting to redis %s:%s", host, port) - import redis # deferring so that importing talker (for ut) doesn't immediately fail if package not available self.redis = redis.StrictRedis( host=host, port=port, db=0, password=password, socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout, @@ -947,7 +948,4 @@ def main(*args): if __name__ == '__main__': args = sys.argv[1:] - if "--ut" in args: - print("Talker don't need no UT") - else: - sys.exit(main(*args)) + sys.exit(main(*args)) From 06cff98296557ee08d20977665fa5b3aed769bdb Mon Sep 17 00:00:00 2001 From: Doron Cohen Date: Mon, 28 Oct 2019 17:39:22 +0200 Subject: [PATCH 06/16] agent: remove some python 2 compat code --- talker_agent/talker.py | 45 ++++++++---------------------------------- 1 file changed, 8 insertions(+), 37 deletions(-) diff --git a/talker_agent/talker.py b/talker_agent/talker.py index df05625..24dfc57 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -16,50 +16,21 @@ import glob import atexit import random -from textwrap import dedent from contextlib import contextmanager from logging import getLogger from logging.handlers import RotatingFileHandler -try: - from configparser import ConfigParser -except: # python 2.7 - from ConfigParser import ConfigParser +from configparser import ConfigParser import redis -PY3 = sys.version_info[0] == 3 - -# =========================================================================================== -# Define a python2/3 compatible 'reraise' function for re-raising exceptions properly -# Since the syntax is different and would not compile between versions, we need to using 'exec' - -if PY3: - def reraise(tp, value, tb=None): - if value is None: - value = tp() - if value.__traceback__ is not tb: - raise value.with_traceback(tb) - raise value -else: - def exec_(_code_, _globs_=None, _locs_=None): - """Execute code in a namespace.""" - if _globs_ is None: - frame = sys._getframe(1) - _globs_ = frame.f_globals - if _locs_ is None: - _locs_ = frame.f_locals - del frame - elif _locs_ is None: - _locs_ = _globs_ - exec("""exec _code_ in _globs_, _locs_""") - - exec_(dedent(""" - def reraise(tp, value, tb=None): - raise tp, value, tb - """)) - -# =========================================================================================== +def reraise(tp, value, tb=None): + if value is None: + value = tp() + if value.__traceback__ is not tb: + raise value.with_traceback(tb) + raise value + CONFIG_FILENAME = '/root/talker/config.ini' REBOOT_FILENAME = '/root/talker/reboot.id' From 34936c15c11725dc9c3ff623893c547e2d5b1c4a Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Wed, 20 Nov 2019 16:16:25 +0200 Subject: [PATCH 07/16] redis: upgrade version to 3.3.11. stop using forked redis in agent fix #33 --- setup.py | 2 +- talker_agent/install_agent.sh | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index c199f58..bd6f62d 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ url="https://github.com/weka-io/talker", license='BSD', install_requires=[ - 'redis==3.3.7', + 'redis==3.3.11', 'weka-easypy==0.3.1' ], extras_require={ diff --git a/talker_agent/install_agent.sh b/talker_agent/install_agent.sh index 5bdfd0c..56a3b25 100755 --- a/talker_agent/install_agent.sh +++ b/talker_agent/install_agent.sh @@ -23,7 +23,7 @@ fi echo "$TALKER_AGENT_VERSION" > version # Add redis py dependency -REDIS_COMMIT_ID=ad84781ea260be0a1ca4bf6768959b50e8835a6b -curl -fL https://github.com/weka-io/redis-py/archive/"$REDIS_COMMIT_ID".tar.gz | tar -xz -mv redis-py-"$REDIS_COMMIT_ID"/redis . -rm -rf redis-py-"$REDIS_COMMIT_ID" +TAG=3.3.11 +curl -fL https://github.com/andymccurdy/redis-py/archive/"$TAG".tar.gz | tar -xz +mv redis-py-"$TAG"/redis . +rm -rf redis-py-"$TAG" From 7f3a06dc53e232aae2ccb15031c29277d1ab629b Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 21 Nov 2019 11:43:39 +0200 Subject: [PATCH 08/16] client: remove semver #fix 23 --- talker/client.py | 5 +- talker/command.py | 37 ++++---------- talker/semver.py | 122 ---------------------------------------------- 3 files changed, 11 insertions(+), 153 deletions(-) delete mode 100644 talker/semver.py diff --git a/talker/client.py b/talker/client.py index 2bfb546..6c29390 100644 --- a/talker/client.py +++ b/talker/client.py @@ -19,7 +19,6 @@ REDIS_SOCKET_TIMEOUT, REDIS_SOCKET_CONNECT_TIMEOUT, REDIS_RETRY_ON_TIMEOUT, REDIS_HEALTH_CHECK_INTERVAL, _logger, _verbose_logger ) -from talker.semver import SemVer @locking_cache @@ -57,7 +56,6 @@ def __init__(self, host, password, port, agent_version, name): self._name = name or "anon" self._journal_saved = False self.reactor = TalkerReactor(self) - self.agent_version = SemVer.loads_fuzzy(agent_version or "0.") # '0.' for when the agent is not-yet installed _logger.debug("%s: initialized", self) @property @@ -172,7 +170,7 @@ def poll(self, cmds): res_idx_to_i = {} res_idx = 0 for cmd in cmds: - if cmd.ack_supported and cmd.ack is None: + if cmd.ack is None: p.lpop(cmd._ack_key) res_idx_to_i[res_idx] = cmd, 'ack' res_idx += 1 @@ -289,7 +287,6 @@ def _poll(): _logger.info("Waiting on %s command(s) on %s host(s): %s", len(pending), len(hosts), ", ".join(hosts)) for cmd in pending: since_started = ( - "no-ack" if not cmd.ack_supported else "acked {:ago}".format(cmd.since_started) if cmd.ack else "not-started") _logger.debug(" job-id: %s (%s)", cmd.job_id, since_started) diff --git a/talker/command.py b/talker/command.py index 6b8e2bf..1ac7a2c 100644 --- a/talker/command.py +++ b/talker/command.py @@ -10,7 +10,6 @@ from easypy.properties import safe_property from easypy.timing import Timer -from talker.semver import SMV from talker.config import ( _logger, _verbose_logger, get_logger, MAX_OUTPUT_PER_CHANNEL, JOB_PID_TIMEOUT, AGENT_SEND_TIMEOUT, TALKER_CONTEXT, @@ -24,10 +23,6 @@ ) -V1_3_1 = SMV("1.3.1") # min version for ack and kwargs_resilient -V1_6_0 = SMV("1.6.0") # min version for get_pid - - class AbortedBy: timeout = 'timeout' reboot = 'reboot' @@ -95,15 +90,8 @@ def __init__(self, self._line_timeout_synced = False self.attempts = 0 # in case of TalkerCommandLost - # this is the version from which the agent acknowledges receipt of command - self.ack_supported = self.talker.agent_version >= V1_3_1 - # this is the version from which the agent is ignores params it does not support - self.kwargs_resilient = self.talker.agent_version >= V1_3_1 - # this is the version from which the agent is ignores params it does not support - self.pid_supported = self.talker.agent_version >= V1_6_0 - def __repr__(self): - return "%s(<%s>%s)" % (self.__class__.__name__, self.job_id, "!" if (self.ack_supported and not self.ack) else "") + return "%s(<%s>%s)" % (self.__class__.__name__, self.job_id, "!" if (not self.ack) else "") @property def _result_key(self): @@ -161,9 +149,6 @@ def get_pid(self, timeout=JOB_PID_TIMEOUT, wait_for_ack=True): :returns: The command PID :rtype: int """ - if not self.pid_supported: - raise NotImplementedError( - 'This version (%s) does not support get_pid - need %s or better' % (self.talker.agent_version, V1_6_0)) if wait_for_ack: self.wait(for_ack=True) @@ -203,13 +188,12 @@ def send(self): timeout = self.timeout if self.server_timeout else None extras = {} - if self.kwargs_resilient: - extras.update( - max_output_per_channel=self.max_output_per_channel, - set_new_logpath=self.set_new_logpath, - line_timeout=self.line_timeout, - log_file=self.log_file, - ) + extras.update( + max_output_per_channel=self.max_output_per_channel, + set_new_logpath=self.set_new_logpath, + line_timeout=self.line_timeout, + log_file=self.log_file, + ) self.put_command(id=self.job_id, cmd=args, timeout=timeout, job_repr=compact(command_string, 100), **extras) @@ -446,7 +430,7 @@ def check_client_timeout(self): # client/reactor did not send command yet if self.handling_timer.expired: self.raise_exception(exception_cls=TalkerClientSendTimeout, timeout=self.handling_timer.elapsed) - elif self.ack_supported and not self.ack: + elif not self.ack: is_talker_alive_name = 'is_talker_alive' # client did not receive command - check all timers @@ -563,7 +547,7 @@ def poll(self, check_client_timeout=True): if self.retcode is not None: return self.retcode - if self.ack_supported and not self.ack: + if not self.ack: ack = self.talker.reactor.lpop(self._ack_key) if ack is None: if check_client_timeout: @@ -614,7 +598,6 @@ def wait(self, for_ack=False): while True: if for_ack: - assert self.ack_supported, "ack is not supported" if self.ack is not None: return self.ack @@ -624,7 +607,7 @@ def wait(self, for_ack=False): # we don't want to wait too long, cause we want to raise timeout exceptions promptly blpop_timeout = ( 1 if not self.is_sent - else self.ack_timer.expiration // 10 if (self.ack_supported and not self.ack) + else self.ack_timer.expiration // 10 if (not self.ack) else self.client_timer.expiration // 10) result = self.talker.reactor.blpop([self._exit_code_key, self._ack_key], timeout=blpop_timeout) diff --git a/talker/semver.py b/talker/semver.py deleted file mode 100644 index 64ba6c3..0000000 --- a/talker/semver.py +++ /dev/null @@ -1,122 +0,0 @@ -# TODO: move this into easypy - -from collections import namedtuple -import re - - -class SemVerParseException(ValueError): - pass - - -class SemVer(namedtuple("SemVer", "major minor patch build tag")): - """ Semantic Version object - - From https://semver.org: - Given a version number MAJOR.MINOR.PATCH, increment the: - - MAJOR version when you make incompatible API changes, - MINOR version when you add functionality in a backwards-compatible manner, and - PATCH version when you make backwards-compatible bug fixes. - Additional labels for pre-release and build metadata are available as extensions to the MAJOR.MINOR.PATCH format. - - We use a fourth part, let's call it BUILD and define it is incremented sporadically. - """ - - @classmethod - def loads(cls, string, *, separator='.', tag_separator='-', raise_on_failure=True): - string, _, tag = string.partition(tag_separator) - parts = string.split(separator) - try: - return cls(*parts, tag=tag) - except ValueError as e: - if raise_on_failure: - raise SemVerParseException("Error parsing %s: %s" % (string, str(e))) from None - else: - return None - - @classmethod - def loads_fuzzy(cls, string): - """Loads a version string where separators cab be either '.' or '-'""" - - regex = re.compile(r"((?:\d+[.-])+)(.*)") - string, tag = regex.fullmatch(string).groups() - parts = re.split("[-.]", string) - return cls(*filter(None, parts), tag=tag) - - def __new__(cls, major=0, minor=0, patch=None, build=None, *, tag=None): - return super().__new__( - cls, - major=int(major), - minor=int(minor), - patch=int(patch) if patch is not None else None, - build=int(build) if build is not None else None, - tag="" if tag is None else str(tag), - ) - - @property - def __dict__(self): - return super().__dict__ - - def __str__(self): - return self.dumps() - - def __repr__(self): - return "<{} {}>".format(self.__class__.__name__, self) - - def _to_tuple(self): - return (self.major, self.minor, self.patch or 0, self.build or 0) - - def __eq__(self, other): - assert isinstance(other, self.__class__) - return self._to_tuple() == other._to_tuple() and self.tag == other.tag - - def __lt__(self, other): - assert isinstance(other, self.__class__) - return self._to_tuple() < other._to_tuple() - - def __gt__(self, other): - assert isinstance(other, self.__class__) - return self._to_tuple() > other._to_tuple() - - def __ge__(self, other): - return not self.__lt__(other) - - def __le__(self, other): - return not self.__gt__(other) - - def dumps(self, *, separator='.', tag_separator='-'): - template = "{self.major}{separator}{self.minor}" - if self.patch is not None: - template += "{separator}{self.patch}" - if self.build: - template += "{separator}{self.build}" - if self.tag: - template += "{tag_separator}{self.tag}" - - return template.format(**locals()) - - def copy(self, **kw): - return self.__class__(**dict(self._asdict(), **kw)) - - def bump_build(self, clear_tag=True): - return self.copy( - build=(0 if self.build is None else self.build) + 1, - tag='' if clear_tag else self.tag) - - def bump_patch(self, clear_tag=True): - return self.copy( - build=0, patch=self.patch + 1, - tag='' if clear_tag else self.tag) - - def bump_minor(self, clear_tag=True): - return self.copy( - build=0, patch=0, minor=self.minor + 1, - tag='' if clear_tag else self.tag) - - def bump_major(self, clear_tag=True): - return self.copy( - build=0, patch=0, minor=0, major=self.major + 1, - tag='' if clear_tag else self.tag) - - -SMV = SemVer.loads From b9b679afebe2a4de9b9311cedb442359e97a98a0 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 21 Nov 2019 11:47:54 +0200 Subject: [PATCH 09/16] changelog: update --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8d5ba0..cf0a8ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed - upgrade agent to run with python3 +- upgrade - redis==3.3.11 +- use main redis project (not forked) for agent +- remove version dependency behavior ## [1.8.6] - 2019-11-05 ### Added From 1713938fb0c95f20a35785696b3ab50307b64f1b Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 21 Nov 2019 14:17:20 +0200 Subject: [PATCH 10/16] version 1.8.7 --- CHANGELOG.md | 2 ++ VERSION | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf0a8ac..4bcc95b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +## [1.8.7] - 2019-11-21 ### Changed - upgrade agent to run with python3 - upgrade - redis==3.3.11 diff --git a/VERSION b/VERSION index 9eadd6b..d2c4b27 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.8.6 \ No newline at end of file +1.8.7 \ No newline at end of file From 19ef58a9c061c8420d517aafc2d337291ccf9e49 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 21 Nov 2019 14:53:13 +0200 Subject: [PATCH 11/16] ci: travis cleanup skipping on deploy --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 9632b79..8af687d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,6 +48,7 @@ deploy: user: wekaio-pypi password: secure: oJATvgDncF6BWrVwQERAkwzNwm5UYmLbEUdMMWEAReGhKjNC/FpmBVwtzvObWjkfXXKSWyCm+gc7bYX8GRDDtNLfO5PkOFaMIDS31cht9W7tbVbc8FHVDNZ0Z09ZQZzaEBUnSZQAJKTTa4pc0BcdJNzVABn5B5CxSPDz9S6rO5vFOQtQNpGqivUVF0BKSNce8Je5ZUvb5WwMuvTM2WeNDyPlwdcCsuwqSjgeprBWNd1+62bjoG9dKQz1R/yylzZ2aoQ8x4+SkTLbP9zewW5eNtVomz5PFXG3NLFJLU2aExpib1n8lbK6EmUCp+gDwR+sHAA49jbo6cm92N6scJ+TI6rsuPCRLHCIpxXrhp5/t/kETFq+y8FJgFDH0k9RLIwefjHUZuqxfbRs32v3gpZIUqwXB4jrCeY8IClfWW2WvPyfGaxoMMNKd8cr3s6ZzNrxhq091JcJHlBNGHX2ev6tyTSLy2eNqIo2ueZyrDmsVWYWT+VoeHWo/0q/F5SxxnuuwgraDvez/Fpontr2qIBKShwZoFpSEcs/m57nekVhtEiJEwtbYUapk5iLlC3iMELxnjvsEfCT/df/HWQvEuBAwX90nIxWRrGC8B7bfdKXBmUjJ/qBGsOrL0F+Gj8iQRKXl87rCN4GoGLG7XXDM5f/juNtcrhlU5zEDMslYgmw1Wg= + skip_cleanup: true on: tags: true branch: master From fc20ee44f43eb192314f670f028e5738a7366b06 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 21 Nov 2019 15:02:33 +0200 Subject: [PATCH 12/16] ci: travis pypi password update --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8af687d..60d10c6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,9 +45,9 @@ install: - pip install .[test] deploy: provider: pypi - user: wekaio-pypi + user: __token__ password: - secure: oJATvgDncF6BWrVwQERAkwzNwm5UYmLbEUdMMWEAReGhKjNC/FpmBVwtzvObWjkfXXKSWyCm+gc7bYX8GRDDtNLfO5PkOFaMIDS31cht9W7tbVbc8FHVDNZ0Z09ZQZzaEBUnSZQAJKTTa4pc0BcdJNzVABn5B5CxSPDz9S6rO5vFOQtQNpGqivUVF0BKSNce8Je5ZUvb5WwMuvTM2WeNDyPlwdcCsuwqSjgeprBWNd1+62bjoG9dKQz1R/yylzZ2aoQ8x4+SkTLbP9zewW5eNtVomz5PFXG3NLFJLU2aExpib1n8lbK6EmUCp+gDwR+sHAA49jbo6cm92N6scJ+TI6rsuPCRLHCIpxXrhp5/t/kETFq+y8FJgFDH0k9RLIwefjHUZuqxfbRs32v3gpZIUqwXB4jrCeY8IClfWW2WvPyfGaxoMMNKd8cr3s6ZzNrxhq091JcJHlBNGHX2ev6tyTSLy2eNqIo2ueZyrDmsVWYWT+VoeHWo/0q/F5SxxnuuwgraDvez/Fpontr2qIBKShwZoFpSEcs/m57nekVhtEiJEwtbYUapk5iLlC3iMELxnjvsEfCT/df/HWQvEuBAwX90nIxWRrGC8B7bfdKXBmUjJ/qBGsOrL0F+Gj8iQRKXl87rCN4GoGLG7XXDM5f/juNtcrhlU5zEDMslYgmw1Wg= + secure: IbmdvH5PFfU74iQjRgtx6/dMbQIv+kSQhxOmPsB3KA7NGWWSeSn2Vjm8kb457x5Tau/rZwG8z6Q2lbbkgylAZ2H9OWUMw60ZtPJK351EGdljLqSFoaFxg60U7a8+dwSjhX91bRgqNYLF03oviRgVloYc/8/1b84tGFVelAJ6khZE+Ll6ndsqEtsCh9zSkd+PMlOWYTOhxUZu5hYUfpnwN1acnfhuNjGBlqvqTIA9ErTOxt50g0EBUT0Mzlkq0Wb70RaT6NCAI5NHIz/ZyVnk8hANyldoiuCpMsxSCq/DcQClHRiSDBo3sf9Glkeqk/tN0L1ZCT+ADxOs529Sj0got8VtlLY/qXRnUD+KqDXCEvtoLoGQantaG/HsFvESEHCdGo9EqHtPjLvhRQDZC/DxwUmCV2qKjtlrv/UflDLEg3cBKlDjLFzUFiliwjcJ3/Z9ljX5nm7k8/v4e014kncH6lBH2/wj7VhSBJ+6RM4mcocQt6wGdnWoc+66CO8PHKsjdTvasEp4V0D1WJqZN5n4W21L5vnQnUgSJ27eS8OZxWfnFZpzUnc4o67N32IOVKoZgnSY2TV/KXkZj4usEevDDtimH9fXAqImCZPyXc7Um/hP7YOiINGci18fOUFniDpJEIUIpm9K0Eu3s0JoFU2t4YxT68D7oThWwwY8PVJi+JI= skip_cleanup: true on: tags: true From 5621e2c4e1f9cbac407a4eb6c43a045afeb4c115 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Thu, 5 Dec 2019 16:43:09 +0200 Subject: [PATCH 13/16] agent: run threads safely --- talker_agent/talker.py | 59 ++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 24dfc57..157cf9a 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -20,6 +20,7 @@ from logging import getLogger from logging.handlers import RotatingFileHandler from configparser import ConfigParser +from threading import Lock import redis @@ -59,6 +60,29 @@ def reraise(tp, value, tb=None): JOBS_EXPIRATION = 15 # 20 * 60 # how long to keep job ids in the EOS registry (exactly-once-semantics) config = None +first_exception_info = None +safe_thread_lock = Lock() + + +class SafeThread(threading.Thread): + def __init__(self, *, target, name, args=(), kwargs=None, daemon=None): + super().__init__(None, target, name, args, kwargs, daemon=daemon) + self.exc_info = None + + def run(self): + global first_exception_info + try: + self._target(*self._args, **self._kwargs) + except: + exc_info = sys.exc_info() + logger.info("exception in '%s'", self.name, exc_info=exc_info) + with safe_thread_lock: + if not first_exception_info: + first_exception_info = sys.exc_info() + finally: + # Avoid a refcycle if the thread is running a function with + # an argument that has a member that points to the thread. + del self._target, self._args, self._kwargs class LineTimeout(Exception): @@ -439,9 +463,7 @@ def _kill(): except Exception as e: self.logger.error(e) - thread = threading.Thread(target=_kill, name="killer-%s" % self.job_id) - thread.daemon = True - thread.start() + SafeThread(target=_kill, name="killer-%s" % self.job_id, daemon=True).start() self.reset_timeout(new_timeout=graceful_timeout + 10) @@ -451,7 +473,7 @@ def __init__(self, *args, **kwargs): super(RebootJob, self).__init__(*args, **kwargs) def start(self): - threading.Thread(target=self.reboot_host, name="Reboot").start() + SafeThread(target=self.reboot_host, name="Reboot").start() def reboot_host(self): with open(REBOOT_FILENAME, 'w') as f: @@ -497,8 +519,6 @@ def __init__(self): self.output_lock = threading.RLock() self.redis = None self.host_id = None - self.redis_fetcher = None - self.redis_sender = None self.job_poller = None self.fds_poller = select.poll() self.fds_to_channels = {} @@ -744,35 +764,24 @@ def sync_jobs_progress(self): else: time.sleep(CYCLE_DURATION) - def start_worker(self, worker, name): - - def safe_run(): - try: - return worker() - except: # noqa - self.exc_info = sys.exc_info() - logger.debug("exception in '%s'", name, exc_info=self.exc_info) - - t = threading.Thread(target=safe_run, name=name) - t.daemon = True - t.start() - return t - def start(self): + global first_exception_info + first_exception_info = None + self.finalize_previous_session() if os.path.isfile(JOBS_SEEN): with open(JOBS_SEEN, "r") as f: self.seen_jobs = json.load(f) - self.redis_fetcher = self.start_worker(self.fetch_new_jobs, name="RedisFetcher") - self.redis_sender = self.start_worker(self.sync_jobs_progress, name="JobProgress") + SafeThread(target=self.fetch_new_jobs, name="RedisFetcher", daemon=True).start() + SafeThread(target=self.sync_jobs_progress, name="JobProgress", daemon=True).start() while not self.stop_agent.is_set(): if not self.get_jobs_outputs(): time.sleep(CYCLE_DURATION / 10.0) - if self.exc_info: + if first_exception_info: logger.debug("re-raising exception from worker") - reraise(*self.exc_info) + reraise(*first_exception_info) assert False, "exception should have been raised" def setup(self): @@ -819,7 +828,7 @@ def unregister_fileno(self, fileno): def wait_proc(proc, timeout): - t = threading.Thread(target=proc.wait) + t = SafeThread(target=proc.wait, name='wait_proc') t.start() t.join(timeout) return not t.is_alive() From 952db29484a9799e64c5f5a2563dd8d8ec0b4f1a Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Tue, 10 Dec 2019 15:41:52 +0200 Subject: [PATCH 14/16] agent: fix dict iteration bug caused by moving to python3 --- talker_agent/talker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 157cf9a..da0ae09 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -640,7 +640,7 @@ def stop_for_reboot(self, requested_by): requested_by.log("Some jobs not yet finished, setting exit code to 'reboot' and proceeding") with self.pipeline() as pipeline: - for job_id, job in self.current_processes.items(): + for job_id, job in list(self.current_processes.items()): if job_id == requested_by.job_id: continue job.set_result('reboot') From 4ad8029aa6439e20bd0b57018d09652ac6e8526d Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Sun, 22 Dec 2019 13:35:49 +0200 Subject: [PATCH 15/16] agent: add thread safe test --- tests/uts/test_agent.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/uts/test_agent.py b/tests/uts/test_agent.py index 445dd6c..65f300a 100644 --- a/tests/uts/test_agent.py +++ b/tests/uts/test_agent.py @@ -33,14 +33,24 @@ def raise_file_not_found(*args, **kwargs): raise OSError(2, 'No such file or directory') +class RebootMockException(Exception): + pass + + +def reboot_mock_exception(*args): + raise RebootMockException("This is reboot mock exception") + + JOBS_DIR = '/tmp/talker/jobs' EXCEPTION_FILENAME = '/tmp/talker/last_exception' JOBS_SEEN = os.path.join(JOBS_DIR, 'eos.json') +REBOOT_FILENAME = '/tmp/talker/reboot.id' @patch('talker_agent.talker.JOBS_DIR', JOBS_DIR) @patch('talker_agent.talker.JOBS_SEEN', JOBS_SEEN) @patch('talker_agent.talker.EXCEPTION_FILENAME', EXCEPTION_FILENAME) +@patch('talker_agent.talker.REBOOT_FILENAME', REBOOT_FILENAME) class TestAgent(unittest.TestCase): def setUp(self): @@ -167,3 +177,8 @@ def test_max_output_per_channel(self): res = get_stdout(self.agent.redis, job_id) expected_val = val.replace('\\n', '\n') * val_repeats self.assertEqual(res, expected_val) + + @patch('talker_agent.talker.RebootJob.reboot_host', reboot_mock_exception) + def test_safe_thread(self): + _ = self.run_cmd_on_agent('reboot', force=True) + self.assert_agent_exception(RebootMockException) From b26b5f1a2cb072f7e4a80151c5e52069858de025 Mon Sep 17 00:00:00 2001 From: Assaf Giladi Date: Sun, 22 Dec 2019 13:58:10 +0200 Subject: [PATCH 16/16] changelog : update --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bcc95b..5f45c85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- make all agent threads thread-safe +- fix dict items iteration when doing reboot due to python3 changes ## [1.8.7] - 2019-11-21 ### Changed