diff --git a/funcx_forwarder/forwarder.py b/funcx_forwarder/forwarder.py index 0dcafeb..c4a4e08 100644 --- a/funcx_forwarder/forwarder.py +++ b/funcx_forwarder/forwarder.py @@ -446,10 +446,11 @@ def forward_task_to_endpoint(self): logger.warning( "Putting back REDIS message for unconnected endpoint: %s:%s", dest_endpoint, - task, + task_id, extra={ "log_type": "forwarder_redis_task_put", "endpoint_id": dest_endpoint, + "task_id": task_id }, ) self.redis_pubsub.put(dest_endpoint, task) @@ -462,8 +463,7 @@ def forward_task_to_endpoint(self): except TypeError: # A TypeError is raised when the Task object can't be recomposed from # redis due to missing values during high-workload events. - logger.exception(f"Unable to access task {task_id} from redis") - logger.debug( + logger.exception( f"Task:{task_id} is now LOST", extra={ "log_type": "task_lost", @@ -476,13 +476,24 @@ def forward_task_to_endpoint(self): try: self.tasks_q.put(dest_endpoint.encode("utf-8"), zmq_task.pack()) except (zmq.error.ZMQError, zmq.Again): - logger.exception(f"Endpoint:{dest_endpoint} is unreachable") + logger.exception( + f"Endpoint:{dest_endpoint} is unreachable", + extra={ + "log_type": "endpoint_disconnect", + "endpoint_id": dest_endpoint + } + ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) self.disconnect_endpoint(dest_endpoint) except Exception: logger.exception( - f"Caught error while sending {task_id} to {dest_endpoint}" + f"Caught error while sending {task_id} to {dest_endpoint}", + extra={ + "log_type": "task_dispatch_fail", + "endpoint_id": dest_endpoint, + "task_id": task_id + } ) # put task back in redis since it was not sent to endpoint self.redis_pubsub.put(dest_endpoint, task) @@ -550,15 +561,18 @@ def handle_results(self): except Exception: logger.error( "Caught error while trying to push endpoint status data " - "into redis" + "into redis", + extra={ + "log_type": "EPStatus_update_failed", + "endpoint_id": endpoint_id, + } ) # Update task status from endpoint task_status_delta = message.task_statuses for task_id, status_code in task_status_delta.items(): status = status_code_convert(status_code) - - logger.debug(f"Updating Task({task_id}) to status={status}") + log_task_transition(task, status) task = RedisTask(self.redis_client, task_id) task.status = status return @@ -628,6 +642,7 @@ def handle_results(self): task.exception = message["exception"] task.completion_time = time.time() task.set_expire(self.result_ttl) + self.log_task_transition(task, "result_received") # this critical section is where the task ID is sent over RabbitMQ, # and the task result will not be acked if this fails