Skip to content
This repository was archived by the owner on Feb 22, 2022. It is now read-only.
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions funcx_forwarder/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,11 @@ def forward_task_to_endpoint(self):
logger.warning(
"Putting back REDIS message for unconnected endpoint: %s:%s",
dest_endpoint,
task,
task_id,
extra={
"log_type": "forwarder_redis_task_put",
"endpoint_id": dest_endpoint,
"task_id": task_id
},
)
self.redis_pubsub.put(dest_endpoint, task)
Expand All @@ -462,8 +463,7 @@ def forward_task_to_endpoint(self):
except TypeError:
# A TypeError is raised when the Task object can't be recomposed from
# redis due to missing values during high-workload events.
logger.exception(f"Unable to access task {task_id} from redis")
logger.debug(
logger.exception(
f"Task:{task_id} is now LOST",
extra={
"log_type": "task_lost",
Expand All @@ -476,13 +476,24 @@ def forward_task_to_endpoint(self):
try:
self.tasks_q.put(dest_endpoint.encode("utf-8"), zmq_task.pack())
except (zmq.error.ZMQError, zmq.Again):
logger.exception(f"Endpoint:{dest_endpoint} is unreachable")
logger.exception(
f"Endpoint:{dest_endpoint} is unreachable",
extra={
"log_type": "endpoint_disconnect",
"endpoint_id": dest_endpoint
}
)
# put task back in redis since it was not sent to endpoint
self.redis_pubsub.put(dest_endpoint, task)
self.disconnect_endpoint(dest_endpoint)
except Exception:
logger.exception(
f"Caught error while sending {task_id} to {dest_endpoint}"
f"Caught error while sending {task_id} to {dest_endpoint}",
extra={
"log_type": "task_dispatch_fail",
"endpoint_id": dest_endpoint,
"task_id": task_id
}
)
# put task back in redis since it was not sent to endpoint
self.redis_pubsub.put(dest_endpoint, task)
Expand Down Expand Up @@ -550,15 +561,18 @@ def handle_results(self):
except Exception:
logger.error(
"Caught error while trying to push endpoint status data "
"into redis"
"into redis",
extra={
"log_type": "EPStatus_update_failed",
"endpoint_id": endpoint_id,
}
)

# Update task status from endpoint
task_status_delta = message.task_statuses
for task_id, status_code in task_status_delta.items():
status = status_code_convert(status_code)

logger.debug(f"Updating Task({task_id}) to status={status}")
log_task_transition(task, status)
task = RedisTask(self.redis_client, task_id)
task.status = status
return
Expand Down Expand Up @@ -628,6 +642,7 @@ def handle_results(self):
task.exception = message["exception"]
task.completion_time = time.time()
task.set_expire(self.result_ttl)
self.log_task_transition(task, "result_received")

# this critical section is where the task ID is sent over RabbitMQ,
# and the task result will not be acked if this fails
Expand Down