Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ matrix:
env:
- TEST_SUITE="tests/uts/test_client.py"
- stage: agent unit tests
python: 2.7
python: 3.5
env:
- TEST_SUITE="tests/uts/test_agent.py"
- stage: integration tests
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- upgrade agent to run with python3

## [1.8.6] - 2019-11-05
### Added
Expand Down
2 changes: 1 addition & 1 deletion talker_agent/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:2.7-slim-buster
FROM python:3.5-slim-buster

#args from docker-compose.yaml
ARG TALKER_AGENT_VERSION
Expand Down
98 changes: 22 additions & 76 deletions talker_agent/talker.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
#!/usr/bin/python


"""
Talker Agent (Server-Side)
==========================

* Important:
- keep this free of dependencies (there's only a redis dependency)
- keep this compatible with python2.6+ (no dict comprehension)

* Packaging:
- update the 'TALKER' version in version_info
- ./teka pack talker

* Testing:
- See ./wepy/devops/talker.py (client-side)

"""
#!/usr/local/bin/python3

import fcntl
import json
Expand All @@ -34,48 +16,22 @@
import glob
import atexit
import random
from textwrap import dedent
from contextlib import contextmanager
from logging import getLogger
from logging.handlers import RotatingFileHandler
try:
from configparser import ConfigParser
except: # python 2.7
from ConfigParser import ConfigParser


PY3 = sys.version_info[0] == 3

# ===========================================================================================
# Define a python2/3 compatible 'reraise' function for re-raising exceptions properly
# Since the syntax is different and would not compile between versions, we need to using 'exec'

if PY3:
def reraise(tp, value, tb=None):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
else:
def exec_(_code_, _globs_=None, _locs_=None):
"""Execute code in a namespace."""
if _globs_ is None:
frame = sys._getframe(1)
_globs_ = frame.f_globals
if _locs_ is None:
_locs_ = frame.f_locals
del frame
elif _locs_ is None:
_locs_ = _globs_
exec("""exec _code_ in _globs_, _locs_""")

exec_(dedent("""
def reraise(tp, value, tb=None):
raise tp, value, tb
"""))

# ===========================================================================================
from configparser import ConfigParser

import redis
from redis import TimeoutError


def reraise(tp, value, tb=None):
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value


CONFIG_FILENAME = '/root/talker/config.ini'
REBOOT_FILENAME = '/root/talker/reboot.id'
Expand Down Expand Up @@ -304,7 +260,7 @@ def start(self):
time.sleep(random.random() * 2)
continue

self.stderr.chunks.append(e.strerror)
self.stderr.chunks.append(e.strerror.encode('utf-8'))
self.set_result(e.errno)
self.finalize()
return
Expand All @@ -313,13 +269,7 @@ def start(self):

self.job_fn = "%s/job.%s.%s" % (JOBS_DIR, self.job_id, self.popen.pid)
with open(self.job_fn, "w") as f:
try:
f.write(repr(self.cmd))
except IOError:
# to help with WEKAPP-74054
os.system("df")
os.system("df -i")
raise
f.write(repr(self.cmd))

self.agent.current_processes[self.job_id] = self
for channel in self.channels:
Expand Down Expand Up @@ -585,7 +535,11 @@ def fetch_new_jobs(self):
jobs_key = 'commands-%s' % self.host_id
while not self.stop_fetching.is_set():
new_jobs = []
ret = self.redis.blpop([jobs_key], timeout=1)
try:
ret = self.redis.blpop([jobs_key], timeout=1)
except TimeoutError:
logger.exception("Failed fetching new jobs")
ret = None
if not ret:
now = time.time()
self.scrub_seen_jobs(now=now)
Expand Down Expand Up @@ -841,7 +795,6 @@ def setup(self):
health_check_interval = config.parser.getfloat('redis', 'health_check_interval')

logger.info("Connecting to redis %s:%s", host, port)
import redis # deferring so that importing talker (for ut) doesn't immediately fail if package not available
self.redis = redis.StrictRedis(
host=host, port=port, db=0, password=password,
socket_timeout=socket_timeout, socket_connect_timeout=socket_connect_timeout,
Expand Down Expand Up @@ -940,10 +893,6 @@ def main(*args):
config = Config()
set_logging_to_file(config.parser.get('logging', 'logpath'))

# to help with WEKAPP-74054
os.system("df")
os.system("df -i")

open("/var/run/talker.pid", "w").write(str(os.getpid()))
atexit.register(os.unlink, "/var/run/talker.pid")

Expand Down Expand Up @@ -975,7 +924,4 @@ def main(*args):

if __name__ == '__main__':
args = sys.argv[1:]
if "--ut" in args:
print("Talker don't need no UT")
else:
sys.exit(main(*args))
sys.exit(main(*args))
7 changes: 6 additions & 1 deletion tests/integration/test_sanity.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

from talker_agent.talker import Config
from talker.errors import CommandAbortedByOverflow
from talker.errors import CommandAbortedByOverflow, CommandExecutionError
from tests.utils import get_talker_client, get_retcode, get_stdout


Expand Down Expand Up @@ -38,3 +38,8 @@ def test_max_output_per_channel_set(self):
if expected_ret == 0:
res = get_stdout(self.client.redis, cmd.job_id)
self.assertEqual(res, val)

def test_bad_command(self):
cmd = self.client.run(self.host_id, 'foo')
with self.assertRaises(CommandExecutionError):
cmd.result()