diff --git a/pulsar/managers/base/external.py b/pulsar/managers/base/external.py index 35cf6c84..2e54899a 100644 --- a/pulsar/managers/base/external.py +++ b/pulsar/managers/base/external.py @@ -1,5 +1,9 @@ import logging from string import Template +from typing import ( + Dict, + Any, +) from pulsar.managers import status from .directory import DirectoryBaseManager @@ -18,7 +22,7 @@ class ExternalBaseManager(DirectoryBaseManager): def __init__(self, name, app, **kwds): super().__init__(name, app, **kwds) - self._external_ids = {} + self._external_ids: Dict[str, Any] = {} self.job_name_template = kwds.get('job_name_template', DEFAULT_JOB_NAME_TEMPLATE) def clean(self, job_id): @@ -46,11 +50,11 @@ def _register_external_id(self, job_id, external_id): if isinstance(external_id, bytes): external_id = external_id.decode("utf-8") self._job_directory(job_id).store_metadata(JOB_FILE_EXTERNAL_ID, external_id) - self._external_ids[job_id] = external_id + self._external_ids[str(job_id)] = external_id return external_id def _external_id(self, job_id): - return self._external_ids.get(job_id, None) + return self._external_ids.get(str(job_id), None) def _job_name(self, job_id): env = self._job_template_env(job_id) @@ -59,9 +63,9 @@ def _job_name(self, job_id): def _recover_active_job(self, job_id): external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID) if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID: - self._external_ids[job_id] = external_id + self._external_ids[str(job_id)] = external_id else: raise Exception("Could not determine external ID for job_id [%s]" % job_id) def _deactivate_job(self, job_id): - del self._external_ids[job_id] + del self._external_ids[str(job_id)]