Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ matrix:
env:
- TEST_SUITE="tests/uts/test_client.py"
- stage: agent unit tests
python: 2.7
python: 3.5
env:
- TEST_SUITE="tests/uts/test_agent.py"
- stage: integration tests
Expand Down Expand Up @@ -45,9 +45,10 @@ install:
- pip install .[test]
deploy:
provider: pypi
user: wekaio-pypi
user: __token__
password:
secure: oJATvgDncF6BWrVwQERAkwzNwm5UYmLbEUdMMWEAReGhKjNC/FpmBVwtzvObWjkfXXKSWyCm+gc7bYX8GRDDtNLfO5PkOFaMIDS31cht9W7tbVbc8FHVDNZ0Z09ZQZzaEBUnSZQAJKTTa4pc0BcdJNzVABn5B5CxSPDz9S6rO5vFOQtQNpGqivUVF0BKSNce8Je5ZUvb5WwMuvTM2WeNDyPlwdcCsuwqSjgeprBWNd1+62bjoG9dKQz1R/yylzZ2aoQ8x4+SkTLbP9zewW5eNtVomz5PFXG3NLFJLU2aExpib1n8lbK6EmUCp+gDwR+sHAA49jbo6cm92N6scJ+TI6rsuPCRLHCIpxXrhp5/t/kETFq+y8FJgFDH0k9RLIwefjHUZuqxfbRs32v3gpZIUqwXB4jrCeY8IClfWW2WvPyfGaxoMMNKd8cr3s6ZzNrxhq091JcJHlBNGHX2ev6tyTSLy2eNqIo2ueZyrDmsVWYWT+VoeHWo/0q/F5SxxnuuwgraDvez/Fpontr2qIBKShwZoFpSEcs/m57nekVhtEiJEwtbYUapk5iLlC3iMELxnjvsEfCT/df/HWQvEuBAwX90nIxWRrGC8B7bfdKXBmUjJ/qBGsOrL0F+Gj8iQRKXl87rCN4GoGLG7XXDM5f/juNtcrhlU5zEDMslYgmw1Wg=
secure: IbmdvH5PFfU74iQjRgtx6/dMbQIv+kSQhxOmPsB3KA7NGWWSeSn2Vjm8kb457x5Tau/rZwG8z6Q2lbbkgylAZ2H9OWUMw60ZtPJK351EGdljLqSFoaFxg60U7a8+dwSjhX91bRgqNYLF03oviRgVloYc/8/1b84tGFVelAJ6khZE+Ll6ndsqEtsCh9zSkd+PMlOWYTOhxUZu5hYUfpnwN1acnfhuNjGBlqvqTIA9ErTOxt50g0EBUT0Mzlkq0Wb70RaT6NCAI5NHIz/ZyVnk8hANyldoiuCpMsxSCq/DcQClHRiSDBo3sf9Glkeqk/tN0L1ZCT+ADxOs529Sj0got8VtlLY/qXRnUD+KqDXCEvtoLoGQantaG/HsFvESEHCdGo9EqHtPjLvhRQDZC/DxwUmCV2qKjtlrv/UflDLEg3cBKlDjLFzUFiliwjcJ3/Z9ljX5nm7k8/v4e014kncH6lBH2/wj7VhSBJ+6RM4mcocQt6wGdnWoc+66CO8PHKsjdTvasEp4V0D1WJqZN5n4W21L5vnQnUgSJ27eS8OZxWfnFZpzUnc4o67N32IOVKoZgnSY2TV/KXkZj4usEevDDtimH9fXAqImCZPyXc7Um/hP7YOiINGci18fOUFniDpJEIUIpm9K0Eu3s0JoFU2t4YxT68D7oThWwwY8PVJi+JI=
skip_cleanup: true
on:
tags: true
branch: master
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- make all agent threads thread-safe
- fix dict items iteration when doing reboot due to python3 changes

## [1.8.7] - 2019-11-21
### Changed
- upgrade agent to run with python3
- upgrade - redis==3.3.11
- use main redis project (not forked) for agent
- remove version dependency behavior

## [1.8.6] - 2019-11-05
### Added
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.8.6
1.8.7
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
url="https://github.com/weka-io/talker",
license='BSD',
install_requires=[
'redis==3.3.7',
'redis==3.3.11',
'weka-easypy==0.3.1'
],
extras_require={
Expand Down
5 changes: 1 addition & 4 deletions talker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
REDIS_SOCKET_TIMEOUT, REDIS_SOCKET_CONNECT_TIMEOUT, REDIS_RETRY_ON_TIMEOUT,
REDIS_HEALTH_CHECK_INTERVAL, _logger, _verbose_logger
)
from talker.semver import SemVer


@locking_cache
Expand Down Expand Up @@ -57,7 +56,6 @@ def __init__(self, host, password, port, agent_version, name):
self._name = name or "anon"
self._journal_saved = False
self.reactor = TalkerReactor(self)
self.agent_version = SemVer.loads_fuzzy(agent_version or "0.") # '0.' for when the agent is not-yet installed
_logger.debug("%s: initialized", self)

@property
Expand Down Expand Up @@ -172,7 +170,7 @@ def poll(self, cmds):
res_idx_to_i = {}
res_idx = 0
for cmd in cmds:
if cmd.ack_supported and cmd.ack is None:
if cmd.ack is None:
p.lpop(cmd._ack_key)
res_idx_to_i[res_idx] = cmd, 'ack'
res_idx += 1
Expand Down Expand Up @@ -289,7 +287,6 @@ def _poll():
_logger.info("Waiting on %s command(s) on %s host(s): %s", len(pending), len(hosts), ", ".join(hosts))
for cmd in pending:
since_started = (
"no-ack" if not cmd.ack_supported else
"acked {:ago}".format(cmd.since_started) if cmd.ack else
"not-started")
_logger.debug(" job-id: %s (%s)", cmd.job_id, since_started)
Expand Down
37 changes: 10 additions & 27 deletions talker/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from easypy.properties import safe_property
from easypy.timing import Timer

from talker.semver import SMV
from talker.config import (
_logger, _verbose_logger, get_logger,
MAX_OUTPUT_PER_CHANNEL, JOB_PID_TIMEOUT, AGENT_SEND_TIMEOUT, TALKER_CONTEXT,
Expand All @@ -24,10 +23,6 @@
)


V1_3_1 = SMV("1.3.1") # min version for ack and kwargs_resilient
V1_6_0 = SMV("1.6.0") # min version for get_pid


class AbortedBy:
timeout = 'timeout'
reboot = 'reboot'
Expand Down Expand Up @@ -95,15 +90,8 @@ def __init__(self,
self._line_timeout_synced = False
self.attempts = 0 # in case of TalkerCommandLost

# this is the version from which the agent acknowledges receipt of command
self.ack_supported = self.talker.agent_version >= V1_3_1
# this is the version from which the agent is ignores params it does not support
self.kwargs_resilient = self.talker.agent_version >= V1_3_1
# this is the version from which the agent is ignores params it does not support
self.pid_supported = self.talker.agent_version >= V1_6_0

def __repr__(self):
return "%s(<%s>%s)" % (self.__class__.__name__, self.job_id, "!" if (self.ack_supported and not self.ack) else "")
return "%s(<%s>%s)" % (self.__class__.__name__, self.job_id, "!" if (not self.ack) else "")

@property
def _result_key(self):
Expand Down Expand Up @@ -161,9 +149,6 @@ def get_pid(self, timeout=JOB_PID_TIMEOUT, wait_for_ack=True):
:returns: The command PID
:rtype: int
"""
if not self.pid_supported:
raise NotImplementedError(
'This version (%s) does not support get_pid - need %s or better' % (self.talker.agent_version, V1_6_0))
if wait_for_ack:
self.wait(for_ack=True)

Expand Down Expand Up @@ -203,13 +188,12 @@ def send(self):
timeout = self.timeout if self.server_timeout else None

extras = {}
if self.kwargs_resilient:
extras.update(
max_output_per_channel=self.max_output_per_channel,
set_new_logpath=self.set_new_logpath,
line_timeout=self.line_timeout,
log_file=self.log_file,
)
extras.update(
max_output_per_channel=self.max_output_per_channel,
set_new_logpath=self.set_new_logpath,
line_timeout=self.line_timeout,
log_file=self.log_file,
)

self.put_command(id=self.job_id, cmd=args, timeout=timeout, job_repr=compact(command_string, 100), **extras)

Expand Down Expand Up @@ -446,7 +430,7 @@ def check_client_timeout(self):
# client/reactor did not send command yet
if self.handling_timer.expired:
self.raise_exception(exception_cls=TalkerClientSendTimeout, timeout=self.handling_timer.elapsed)
elif self.ack_supported and not self.ack:
elif not self.ack:
is_talker_alive_name = 'is_talker_alive'

# client did not receive command - check all timers
Expand Down Expand Up @@ -563,7 +547,7 @@ def poll(self, check_client_timeout=True):
if self.retcode is not None:
return self.retcode

if self.ack_supported and not self.ack:
if not self.ack:
ack = self.talker.reactor.lpop(self._ack_key)
if ack is None:
if check_client_timeout:
Expand Down Expand Up @@ -614,7 +598,6 @@ def wait(self, for_ack=False):
while True:

if for_ack:
assert self.ack_supported, "ack is not supported"
if self.ack is not None:
return self.ack

Expand All @@ -624,7 +607,7 @@ def wait(self, for_ack=False):
# we don't want to wait too long, cause we want to raise timeout exceptions promptly
blpop_timeout = (
1 if not self.is_sent
else self.ack_timer.expiration // 10 if (self.ack_supported and not self.ack)
else self.ack_timer.expiration // 10 if (not self.ack)
else self.client_timer.expiration // 10)

result = self.talker.reactor.blpop([self._exit_code_key, self._ack_key], timeout=blpop_timeout)
Expand Down
122 changes: 0 additions & 122 deletions talker/semver.py

This file was deleted.

2 changes: 1 addition & 1 deletion talker_agent/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:2.7-slim-buster
FROM python:3.5-slim-buster

#args from docker-compose.yaml
ARG TALKER_AGENT_VERSION
Expand Down
8 changes: 4 additions & 4 deletions talker_agent/install_agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fi
echo "$TALKER_AGENT_VERSION" > version

# Add redis py dependency
REDIS_COMMIT_ID=ad84781ea260be0a1ca4bf6768959b50e8835a6b
curl -fL https://github.com/weka-io/redis-py/archive/"$REDIS_COMMIT_ID".tar.gz | tar -xz
mv redis-py-"$REDIS_COMMIT_ID"/redis .
rm -rf redis-py-"$REDIS_COMMIT_ID"
TAG=3.3.11
curl -fL https://github.com/andymccurdy/redis-py/archive/"$TAG".tar.gz | tar -xz
mv redis-py-"$TAG"/redis .
rm -rf redis-py-"$TAG"
Loading