From 6c1b344b0d495a48daa873a01d0a368eec234169 Mon Sep 17 00:00:00 2001 From: hzarka Date: Tue, 15 Feb 2022 16:53:37 +0400 Subject: [PATCH 1/3] upgrade grpcio --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 87bb9a2..10e44f0 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ install_requires=[ "betterproto-for-temporal-python-sdk==1.2.5", "dataclasses-json==0.3.8", - "grpcio==1.30.0", + "grpcio==1.43.0", "grpclib==0.3.2", "h2==3.2.0", "more-itertools==7.0.0", From 6e3112ab42ce5e1df41ae05a92b45eccb7b231f3 Mon Sep 17 00:00:00 2001 From: Osama Maharmeh <44961698+NullOsama@users.noreply.github.com> Date: Thu, 17 Feb 2022 20:19:44 +0400 Subject: [PATCH 2/3] add activity loop retry --- temporal/activity_loop.py | 3 ++- temporal/retry.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/temporal/activity_loop.py b/temporal/activity_loop.py index e32dbb3..618394c 100644 --- a/temporal/activity_loop.py +++ b/temporal/activity_loop.py @@ -9,7 +9,7 @@ from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata from temporal.converter import get_fn_args_type_hints -from temporal.retry import retry +from temporal.retry import RetryException, retry from temporal.service_helpers import get_identity from temporal.worker import Worker, StopRequestedException from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \ @@ -93,3 +93,4 @@ async def activity_task_loop_func(worker: Worker): finally: worker.notify_thread_stopped() logger.info("Activity loop ended") + raise RetryException('sleep') diff --git a/temporal/retry.py b/temporal/retry.py index 87fb2b3..367f779 100644 --- a/temporal/retry.py +++ b/temporal/retry.py @@ -7,6 +7,8 @@ MAX_DELAY_SECONDS = 5 * 60 RESET_DELAY_AFTER_SECONDS = 10 * 60 +class RetryException(Exception): + pass def retry(logger=None): def wrapper(fp): @@ -17,6 +19,9 @@ async def retry_loop(*args, **kwargs): await fp(*args, **kwargs) logger.debug("@retry decorated function %s exited, ending retry loop", fp.__name__) break + except RetryException: + logger.info('sleeping...') + await asyncio.sleep(INITIAL_DELAY_SECONDS) except Exception as ex: now = calendar.timegm(time.gmtime()) if last_failed_time == -1 or (now - last_failed_time) > RESET_DELAY_AFTER_SECONDS: From aab27d8f10435e006c016c361e8874ed5a22301f Mon Sep 17 00:00:00 2001 From: malkaed <88436201+malkaed@users.noreply.github.com> Date: Tue, 31 May 2022 13:08:30 +0300 Subject: [PATCH 3/3] Py310 fix (#6) * Revert "add activity loop retry" This reverts commit 6e3112ab42ce5e1df41ae05a92b45eccb7b231f3. * added asyncio.CancelledError to retry exception handling * sepereated CancelledError from normal Exception in retry decorator * minor syntax update * catch gaierror exception as well Co-authored-by: Osama Maharmeh <44961698+NullOsama@users.noreply.github.com> --- temporal/activity_loop.py | 3 +-- temporal/retry.py | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/temporal/activity_loop.py b/temporal/activity_loop.py index 618394c..e32dbb3 100644 --- a/temporal/activity_loop.py +++ b/temporal/activity_loop.py @@ -9,7 +9,7 @@ from temporal.activity import ActivityContext, ActivityTask, complete_exceptionally, complete from temporal.api.taskqueue.v1 import TaskQueue, TaskQueueMetadata from temporal.converter import get_fn_args_type_hints -from temporal.retry import RetryException, retry +from temporal.retry import retry from temporal.service_helpers import get_identity from temporal.worker import Worker, StopRequestedException from temporal.api.workflowservice.v1 import WorkflowServiceStub as WorkflowService, PollActivityTaskQueueRequest, \ @@ -93,4 +93,3 @@ async def activity_task_loop_func(worker: Worker): finally: worker.notify_thread_stopped() logger.info("Activity loop ended") - raise RetryException('sleep') diff --git a/temporal/retry.py b/temporal/retry.py index 367f779..0a9a656 100644 --- a/temporal/retry.py +++ b/temporal/retry.py @@ -1,5 +1,6 @@ import asyncio import calendar +from socket import gaierror import time INITIAL_DELAY_SECONDS = 3 @@ -7,8 +8,6 @@ MAX_DELAY_SECONDS = 5 * 60 RESET_DELAY_AFTER_SECONDS = 10 * 60 -class RetryException(Exception): - pass def retry(logger=None): def wrapper(fp): @@ -19,8 +18,8 @@ async def retry_loop(*args, **kwargs): await fp(*args, **kwargs) logger.debug("@retry decorated function %s exited, ending retry loop", fp.__name__) break - except RetryException: - logger.info('sleeping...') + except (asyncio.CancelledError, gaierror) as err: + logger.info(f"{fp.__name__} raised {err}, retrying...") await asyncio.sleep(INITIAL_DELAY_SECONDS) except Exception as ex: now = calendar.timegm(time.gmtime())