From c897710013eaefdc448e86e657090b64a94047e1 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 21 Jan 2022 15:44:33 -0600 Subject: [PATCH 1/3] Skip loglines when possible. Adding task_transition logs when reasonable --- funcx_forwarder/forwarder.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/funcx_forwarder/forwarder.py b/funcx_forwarder/forwarder.py index 0dcafeb..c4a4e08 100644 --- a/funcx_forwarder/forwarder.py +++ b/funcx_forwarder/forwarder.py @@ -446,10 +446,11 @@ def forward_task_to_endpoint(self): logger.warning( "Putting back REDIS message for unconnected endpoint: %s:%s", dest_endpoint, - task, + task_id, extra={ "log_type": "forwarder_redis_task_put", "endpoint_id": dest_endpoint, + "task_id": task_id }, ) self.redis_pubsub.put(dest_endpoint, task) @@ -462,8 +463,7 @@ def forward_task_to_endpoint(self): except TypeError: # A TypeError is raised when the Task object can't be recomposed from # redis due to missing values during high-workload events. - logger.exception(f"Unable to access task {task_id} from redis") - logger.debug( + logger.exception( f"Task:{task_id} is now LOST", extra={ "log_type": "task_lost", @@ -476,13 +476,24 @@ def forward_task_to_endpoint(self): try: self.tasks_q.put(dest_endpoint.encode("utf-8"), zmq_task.pack()) except (zmq.error.ZMQError, zmq.Again): - logger.exception(f"Endpoint:{dest_endpoint} is unreachable") + logger.exception( + f"Endpoint:{dest_endpoint} is unreachable", + extra={ + "log_type": "endpoint_disconnect", + "endpoint_id": dest_endpoint + } + ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) self.disconnect_endpoint(dest_endpoint) except Exception: logger.exception( - f"Caught error while sending {task_id} to {dest_endpoint}" + f"Caught error while sending {task_id} to {dest_endpoint}", + extra={ + "log_type": "task_dispatch_fail", + "endpoint_id": dest_endpoint, + "task_id": task_id + } ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) @@ -550,15 +561,18 @@ def handle_results(self): except Exception: logger.error( "Caught error while trying to push endpoint status data " - "into redis" + "into redis", + extra={ + "log_type": "EPStatus_update_failed", + "endpoint_id": endpoint_id, + } ) # Update task status from endpoint task_status_delta = message.task_statuses for task_id, status_code in task_status_delta.items(): status = status_code_convert(status_code) - - logger.debug(f"Updating Task({task_id}) to status={status}") + log_task_transition(task, status) task = RedisTask(self.redis_client, task_id) task.status = status return @@ -628,6 +642,7 @@ def handle_results(self): task.exception = message["exception"] task.completion_time = time.time() task.set_expire(self.result_ttl) + self.log_task_transition(task, "result_received") # this critical section is where the task ID is sent over RabbitMQ, # and the task result will not be acked if this fails From 0c905938d251540184d977d4a05a2ed84e12e156 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 21 Jan 2022 15:48:21 -0600 Subject: [PATCH 2/3] Revert "Skip loglines when possible. Adding task_transition logs when reasonable" This reverts commit c897710013eaefdc448e86e657090b64a94047e1. --- funcx_forwarder/forwarder.py | 31 ++++++++----------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/funcx_forwarder/forwarder.py b/funcx_forwarder/forwarder.py index c4a4e08..0dcafeb 100644 --- a/funcx_forwarder/forwarder.py +++ b/funcx_forwarder/forwarder.py @@ -446,11 +446,10 @@ def forward_task_to_endpoint(self): logger.warning( "Putting back REDIS message for unconnected endpoint: %s:%s", dest_endpoint, - task_id, + task, extra={ "log_type": "forwarder_redis_task_put", "endpoint_id": dest_endpoint, - "task_id": task_id }, ) self.redis_pubsub.put(dest_endpoint, task) @@ -463,7 +462,8 @@ def forward_task_to_endpoint(self): except TypeError: # A TypeError is raised when the Task object can't be recomposed from # redis due to missing values during high-workload events. - logger.exception( + logger.exception(f"Unable to access task {task_id} from redis") + logger.debug( f"Task:{task_id} is now LOST", extra={ "log_type": "task_lost", @@ -476,24 +476,13 @@ def forward_task_to_endpoint(self): try: self.tasks_q.put(dest_endpoint.encode("utf-8"), zmq_task.pack()) except (zmq.error.ZMQError, zmq.Again): - logger.exception( - f"Endpoint:{dest_endpoint} is unreachable", - extra={ - "log_type": "endpoint_disconnect", - "endpoint_id": dest_endpoint - } - ) + logger.exception(f"Endpoint:{dest_endpoint} is unreachable") # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) self.disconnect_endpoint(dest_endpoint) except Exception: logger.exception( - f"Caught error while sending {task_id} to {dest_endpoint}", - extra={ - "log_type": "task_dispatch_fail", - "endpoint_id": dest_endpoint, - "task_id": task_id - } + f"Caught error while sending {task_id} to {dest_endpoint}" ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) @@ -561,18 +550,15 @@ def handle_results(self): except Exception: logger.error( "Caught error while trying to push endpoint status data " - "into redis", - extra={ - "log_type": "EPStatus_update_failed", - "endpoint_id": endpoint_id, - } + "into redis" ) # Update task status from endpoint task_status_delta = message.task_statuses for task_id, status_code in task_status_delta.items(): status = status_code_convert(status_code) - log_task_transition(task, status) + + logger.debug(f"Updating Task({task_id}) to status={status}") task = RedisTask(self.redis_client, task_id) task.status = status return @@ -642,7 +628,6 @@ def handle_results(self): task.exception = message["exception"] task.completion_time = time.time() task.set_expire(self.result_ttl) - self.log_task_transition(task, "result_received") # this critical section is where the task ID is sent over RabbitMQ, # and the task result will not be acked if this fails From 26ef9a8f21be32d43f998190df9a95d77a2cad2c Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 21 Jan 2022 15:49:57 -0600 Subject: [PATCH 3/3] Log cleanup --- funcx_forwarder/forwarder.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/funcx_forwarder/forwarder.py b/funcx_forwarder/forwarder.py index 0dcafeb..c4a4e08 100644 --- a/funcx_forwarder/forwarder.py +++ b/funcx_forwarder/forwarder.py @@ -446,10 +446,11 @@ def forward_task_to_endpoint(self): logger.warning( "Putting back REDIS message for unconnected endpoint: %s:%s", dest_endpoint, - task, + task_id, extra={ "log_type": "forwarder_redis_task_put", "endpoint_id": dest_endpoint, + "task_id": task_id }, ) self.redis_pubsub.put(dest_endpoint, task) @@ -462,8 +463,7 @@ def forward_task_to_endpoint(self): except TypeError: # A TypeError is raised when the Task object can't be recomposed from # redis due to missing values during high-workload events. - logger.exception(f"Unable to access task {task_id} from redis") - logger.debug( + logger.exception( f"Task:{task_id} is now LOST", extra={ "log_type": "task_lost", @@ -476,13 +476,24 @@ def forward_task_to_endpoint(self): try: self.tasks_q.put(dest_endpoint.encode("utf-8"), zmq_task.pack()) except (zmq.error.ZMQError, zmq.Again): - logger.exception(f"Endpoint:{dest_endpoint} is unreachable") + logger.exception( + f"Endpoint:{dest_endpoint} is unreachable", + extra={ + "log_type": "endpoint_disconnect", + "endpoint_id": dest_endpoint + } + ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) self.disconnect_endpoint(dest_endpoint) except Exception: logger.exception( - f"Caught error while sending {task_id} to {dest_endpoint}" + f"Caught error while sending {task_id} to {dest_endpoint}", + extra={ + "log_type": "task_dispatch_fail", + "endpoint_id": dest_endpoint, + "task_id": task_id + } ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) @@ -550,15 +561,18 @@ def handle_results(self): except Exception: logger.error( "Caught error while trying to push endpoint status data " - "into redis" + "into redis", + extra={ + "log_type": "EPStatus_update_failed", + "endpoint_id": endpoint_id, + } ) # Update task status from endpoint task_status_delta = message.task_statuses for task_id, status_code in task_status_delta.items(): status = status_code_convert(status_code) - - logger.debug(f"Updating Task({task_id}) to status={status}") + log_task_transition(task, status) task = RedisTask(self.redis_client, task_id) task.status = status return @@ -628,6 +642,7 @@ def handle_results(self): task.exception = message["exception"] task.completion_time = time.time() task.set_expire(self.result_ttl) + self.log_task_transition(task, "result_received") # this critical section is where the task ID is sent over RabbitMQ, # and the task result will not be acked if this fails