diff --git a/services/ui_backend_service/api/card.py b/services/ui_backend_service/api/card.py index 3095043c..87466ee6 100644 --- a/services/ui_backend_service/api/card.py +++ b/services/ui_backend_service/api/card.py @@ -153,11 +153,12 @@ async def get_card_content_by_hash(self, request): task, hash, ) + html_reload_script = "" if cards is None: return web.Response( content_type="text/html", status=404, - body="Card not found for task. Possibly still being processed. Please refresh page to check again.", + body=html_reload_script, ) if cards and hash in cards: @@ -275,7 +276,7 @@ async def get_card_data_for_task_async( step_name=task.get("step_name"), task_id=task.get("task_name") or task.get("task_id"), ) - await cache_client.cache_manager.register(pathspec) + await cache_client.cache_manager.register(pathspec, lock_timeout=0.2) _local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash) if not _local_cache.read_ready(): # Since this is a data update call we can return a 404 and the client diff --git a/services/ui_backend_service/data/cache/card_cache_manager.py b/services/ui_backend_service/data/cache/card_cache_manager.py index 39173f40..45dd438f 100644 --- a/services/ui_backend_service/data/cache/card_cache_manager.py +++ b/services/ui_backend_service/data/cache/card_cache_manager.py @@ -6,7 +6,7 @@ import uuid from asyncio.subprocess import Process import asyncio -from .card_cache_service import CardCache, cleanup_non_running_caches +from .card_cache_service import CardCache, safe_wipe_dir import contextlib @@ -26,15 +26,15 @@ "CARD_CACHE_PROCESS_MAX_UPTIME", 3 * 60 # 3 minutes ) CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME = os.environ.get( - "CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME", 4 # 4 seconds + "CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME", 20 # 20 seconds ) -DEFAULT_CACHE_STORAGE_PATH = "/tmp" -CACHE_STORAGE_PATH = os.environ.get( - "CARD_CACHE_STORAGE_PATH", DEFAULT_CACHE_STORAGE_PATH +DEFAULT_CACHE_STORAGE_PATH_ROOT = "/tmp" +CACHE_STORAGE_PATH_ROOT = os.environ.get( + "CARD_CACHE_STORAGE_PATH_ROOT", DEFAULT_CACHE_STORAGE_PATH_ROOT ) CACHE_SERVICE_LOG_STORAGE_ROOT = os.environ.get("CACHE_SERVICE_LOG_STORAGE_ROOT", None) -CARD_API_HTML_WAIT_TIME = float(os.environ.get("CARD_API_HTML_WAIT_TIME", 5)) +CARD_API_HTML_WAIT_TIME = float(os.environ.get("CARD_API_HTML_WAIT_TIME", 3)) async def _get_latest_return_code(process: Process): @@ -57,179 +57,216 @@ async def process_is_running(process: Process): return status == "running" -class AsyncProcessManager: +class AsyncCardCacheProcessManager: processes = { - # "procid": { - # "proc": asyncio.subprocess.Process, - # "started": time.time() + # "":{ + # "processes": { + # "": { + # "proc": asyncio.subprocess.Process, + # "started": time.time() + # } + # }, + # "write_directory": "" # } } + def get_context_dict(self, context): + _x = self.processes.get(context, None) + if _x is None: + return _x + return _x.copy() + + @property + def current_process_dict(self): + return self.processes[self._current_context]["processes"] + + @property + def current_write_directory(self): + return self.processes[self._current_context]["write_directory"] + def __init__(self, logger) -> None: - self.lock = asyncio.Lock() + self.context_lock = asyncio.Lock() self.logger = logger + self._current_context = None + self.update_context() + + async def set_new_context(self): + async with self.context_lock: + old_context = self._current_context + self.update_context() + return old_context + + async def remove_context(self, context): + async with self.context_lock: + if context in self.processes: + del self.processes[context] + + def update_context(self): + _ctx_dict, _ctx = self.create_context_dict() + self.processes.update(_ctx_dict) + self._current_context = _ctx + + def create_context_dict(self): + _ctx = uuid.uuid4().hex[:8] + return { + _ctx: { + "processes": {}, + "write_directory": os.path.join(CACHE_STORAGE_PATH_ROOT, _ctx), + } + }, _ctx def _register_process(self, procid, proc): - self.processes[procid] = { + self.current_process_dict[procid] = { "proc": proc, "started": time.time(), } + asyncio.create_task(proc.wait()) + + def _make_command(self, pathspec): + return [ + str(i) + for i in [ + sys.executable, + PATH_TO_CACHE_SERVICE, + "task-updates", + pathspec, + "--uptime-seconds", + CARD_CACHE_PROCESS_MAX_UPTIME, + "--list-frequency", + CARD_LIST_POLLING_FREQUENCY, + "--data-update-frequency", + DATA_UPDATE_POLLING_FREQUENCY, + "--html-update-frequency", + CARD_UPDATE_POLLING_FREQUENCY, + "--cache-path", + self.current_write_directory, + "--max-no-card-wait-time", + CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME, + ] + ] - async def add(self, procid, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT): - running_proc = await self.is_running(procid) - if running_proc: - return procid, "running" + async def add(self, procid, pathspec, lock_timeout=0.5): # The lock helps to ensure that the processes only get added one at a time # This is important because the processes are added to a shared dictionary - async with self.lock: + procid, status = procid, None + _acquired_lock = False + try: + await asyncio.wait_for(self.context_lock.acquire(), timeout=lock_timeout) + _acquired_lock = True + running_proc = await self.is_running(procid) + if running_proc: + return procid, "running" proc, started_on = self.get(procid) if proc is not None: - await self.remove(procid, delete_item=True) + self.remove_current(procid, delete_item=True) - logs_file = None - if logs_file_path is not None: - logs_file = open( - os.path.join( - logs_file_path, - "card_cache_service_%s.log" % (procid), - ), - "w", - ) - - await self.spawn(procid, cmd, logs_file, logs_file) - return procid, "started" + cmd = self._make_command(pathspec) + await self.spawn(procid, cmd) + status = "started" + except asyncio.TimeoutError: + status = "add-timeout" + except Exception as e: + status = "add-exception" + finally: + if _acquired_lock and self.context_lock.locked(): + self.context_lock.release() + return procid, status def get(self, procid): - proc_dict = self.processes.get(procid, None) + proc_dict = self.current_process_dict.get(procid, None) if proc_dict is not None: return proc_dict["proc"], proc_dict["started"] return None, None - async def spawn(self, procid, cmd, stdout_file, std_err_file=None): + async def spawn(self, procid, cmd,): proc = await asyncio.create_subprocess_exec( *cmd, - stdout=stdout_file, - stderr=std_err_file, shell=False, ) - self._register_process(procid, proc) + self._register_process(procid, proc,) - async def remove(self, procid, delete_item=True): - if procid not in self.processes: + def remove_current(self, procid, delete_item=True): + if procid not in self.current_process_dict: return - self.logger.info("Removing process: %s" % procid) - await self.processes[procid]["proc"].wait() - self.logger.info("Process removed: %s" % procid) - if self.processes[procid]["proc"].stdout is not None: - self.processes[procid]["proc"].stdout.close() if delete_item: - del self.processes[procid] + del self.current_process_dict[procid] async def cleanup(self): - # The lock ensures that when the dictionary is being modified, - # no other process can modify it at the same time. - async with self.lock: - removal_keys = [] - for procid in self.processes: - running_proc = await self.is_running(procid) - if running_proc: - continue - removal_keys.append(procid) - await self.remove(procid, delete_item=False) - for procid in removal_keys: - del self.processes[procid] - return removal_keys + old_context = await self.set_new_context() + _ctx_dict = self.get_context_dict(old_context) + if _ctx_dict is None: + return [] + # Two things to remove (old Keys and old directories) + wait_keys = [] + for pid in _ctx_dict["processes"]: + status = await process_status(_ctx_dict["processes"][pid]["proc"]) + if status not in ["completed", "failed"]: + wait_keys.append(pid) + + if len(wait_keys) > 0: + # sleeping for MAX_PROCESS_TIME so that all the processes writing should have finished + # AND any upstream request accessing the cache-dir should have been attenteded + await asyncio.sleep(CARD_CACHE_PROCESS_MAX_UPTIME) + + _write_dir = _ctx_dict["write_directory"] + safe_wipe_dir(_write_dir) # TODO : Make this happen in a different thread. + await self.remove_context(old_context) async def is_running(self, procid): - if procid not in self.processes: + if procid not in self.current_process_dict: return False - return await process_is_running(self.processes[procid]["proc"]) + return await process_is_running(self.current_process_dict[procid]["proc"]) async def get_status(self, procid): - if procid not in self.processes: + if procid not in self.current_process_dict: return None - return await process_status(self.processes[procid]["proc"]) + return await process_status(self.current_process_dict[procid]["proc"]) async def running_processes(self): - return [procid for procid in self.processes if await self.is_running(procid)] + return [ + procid + for procid in self.current_process_dict + if await self.is_running(procid) + ] class CardCacheManager: def __init__(self) -> None: self.logger = logging.getLogger("CardCacheManager") - self._process_manager = AsyncProcessManager(self.logger) + self._process_manager = AsyncCardCacheProcessManager(self.logger) self._manager_id = uuid.uuid4().hex self.logger.info("CardCacheManager initialized") - def _make_task_command(self, pathspec): - return [ - str(i) - for i in [ - sys.executable, - PATH_TO_CACHE_SERVICE, - "task-updates", - pathspec, - "--uptime-seconds", - CARD_CACHE_PROCESS_MAX_UPTIME, - "--list-frequency", - CARD_LIST_POLLING_FREQUENCY, - "--data-update-frequency", - DATA_UPDATE_POLLING_FREQUENCY, - "--html-update-frequency", - CARD_UPDATE_POLLING_FREQUENCY, - "--cache-path", - CACHE_STORAGE_PATH, - "--max-no-card-wait-time", - CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME - ] - ] - def get_local_cache(self, pathspec, card_hash): cache = CardCache.load_from_disk( - pathspec, card_hash, CACHE_STORAGE_PATH, + pathspec, + card_hash, + self._process_manager.current_write_directory, ) return cache - async def register(self, pathspec): + async def register(self, pathspec, lock_timeout=0.5): proc_id = pathspec is_running = await self._process_manager.is_running(proc_id) if is_running: return proc_id, "running" - - cmd = self._make_task_command(pathspec) - self.logger.info( - "Registering task [%s]" % (pathspec) + self.logger.info("Registering task [%s]" % (pathspec)) + _id, status = await self._process_manager.add( + proc_id, pathspec, lock_timeout=lock_timeout ) - _id, status = await self._process_manager.add(proc_id, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT) return _id, status async def get_status(self, pathspec): return await self._process_manager.get_status(pathspec) - async def start_process_cleanup_routine(self, interval=60): - try: - while True: - cleanup_keys = await self._process_manager.cleanup() # Perform the cleanup - if len(cleanup_keys) > 0: - self.logger.info( - "Cleaned up processes: %s" % ", ".join(cleanup_keys) - ) - await asyncio.sleep(interval) # Wait for a specified interval before the next cleanup - except asyncio.CancelledError: - self.logger.info("Process cleanup routine cancelled") - - async def cleanup_disk_routine(self, interval=60 * 60 * 4): + async def regular_cleanup_routine(self, interval=60 * 60 * 24 * 5): try: while True: await asyncio.sleep(interval) - # The lock ensure that new processes are not getting created or - # processes are not getting removed when the disk cleanup is happening. - async with self._process_manager.lock: - running_proc_ids = await self._process_manager.running_processes() - cleanup_non_running_caches(CACHE_STORAGE_PATH, CardCache.CACHE_DIR, running_proc_ids) + await self._process_manager.cleanup() except asyncio.CancelledError: - self.logger.info("Disk cleanup routine cancelled") + self.logger.info("Process/Directory cleanup routine cancelled") async def verify_process_has_crashed(cache_manager: CardCacheManager, pathspec): @@ -254,17 +291,22 @@ def _get_html_or_refresh(local_cache: CardCache): async def wait_until_card_is_ready( - cache_manager: CardCacheManager, local_cache: CardCache, max_wait_time=3, frequency=0.1 + cache_manager: CardCacheManager, + local_cache: CardCache, + max_wait_time=3, + frequency=0.1, ): html = None start_time = time.time() - await cache_manager.register(local_cache.pathspec) + await cache_manager.register(local_cache.pathspec, lock_timeout=min(0.5, max_wait_time - 1)) # At this point in the function the process should aleady be running while time.time() - start_time < max_wait_time: html = _get_html_or_refresh(local_cache) if html is not None: break - process_failed = await verify_process_has_crashed(cache_manager, local_cache.pathspec) + process_failed = await verify_process_has_crashed( + cache_manager, local_cache.pathspec + ) if process_failed: cache_manager.logger.error( f"Card {local_cache.card_hash} has crashed for {local_cache.pathspec}" diff --git a/services/ui_backend_service/data/cache/card_cache_service.py b/services/ui_backend_service/data/cache/card_cache_service.py index 20472404..0df3b823 100644 --- a/services/ui_backend_service/data/cache/card_cache_service.py +++ b/services/ui_backend_service/data/cache/card_cache_service.py @@ -57,13 +57,13 @@ def _make_hash(_str): return hashlib.md5(_str.encode()).hexdigest() -def cleanup_non_running_caches(cache_path, cache_dir, pathspecs): - task_dirs = os.listdir(os.path.join(cache_path, cache_dir)) - task_dir_names = [_make_hash(p) for p in pathspecs] - for task_dir in task_dirs: - if task_dir in task_dir_names: - continue - shutil.rmtree(os.path.join(cache_path, cache_dir, task_dir), ignore_errors=True) +def safe_wipe_dir(path): + try: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) + return None + except Exception as e: + return e class CardCache: @@ -312,17 +312,30 @@ def _get_cards_safely(self) -> ResolvedCards: return ResolvedCards([], True,) # On other errors fail away too! def refresh_loop(self): - timings = {"data": None, "html": None, "list": None} + timings = {"card_info": {}, "list": None} start_time = time.time() self.logger.info("Starting cache refresh loop for %s" % self._task_pathspec) cards_are_unresolvable = False _sleep_time = 0.25 + + def _update_timings(card_hash, update_type): + if card_hash not in timings["card_info"]: + timings["card_info"][card_hash] = { + "html": None, + "data": None, + } + timings["card_info"][card_hash][update_type] = time.time() + + def _get_timings(card_hash): + return timings["card_info"].get(card_hash, {"html": None, "data": None}) + while True: if time.time() - start_time > self._uptime_seconds: # exit condition break if _eligible_for_refresh(timings["list"], self.LIST_FREQUENCY_SECONDS): list_status, cards_are_unresolvable = self.load_all_cards() if list_status: + timings["list"] = time.time() self.write_available_cards() cache_is_empty = len(self._cache) == 0 @@ -338,11 +351,13 @@ def refresh_loop(self): break for card_hash in self._cache: - if _eligible_for_refresh(timings["html"], self.HTML_UPDATE_FREQUENCY): + if _eligible_for_refresh(_get_timings(card_hash).get("html", None), self.HTML_UPDATE_FREQUENCY): self.update_card_cache(card_hash, "html") + _update_timings(card_hash, "html") - if _eligible_for_refresh(timings["data"], self.DATA_UPDATE_FREQUENCY): + if _eligible_for_refresh(_get_timings(card_hash).get("data", None), self.DATA_UPDATE_FREQUENCY): self.update_card_cache(card_hash, "data") + _update_timings(card_hash, "data") time.sleep(_sleep_time) diff --git a/services/ui_backend_service/data/cache/store.py b/services/ui_backend_service/data/cache/store.py index 037dc894..25166f52 100644 --- a/services/ui_backend_service/data/cache/store.py +++ b/services/ui_backend_service/data/cache/store.py @@ -37,7 +37,9 @@ CACHE_DAG_STORAGE_LIMIT = int(os.environ.get("CACHE_DAG_STORAGE_LIMIT", DISK_SIZE // 4)) CACHE_LOG_MAX_ACTIONS = int(os.environ.get("CACHE_LOG_MAX_ACTIONS", 8)) CACHE_LOG_STORAGE_LIMIT = int(os.environ.get("CACHE_LOG_STORAGE_LIMIT", DISK_SIZE // 5)) -CARD_CACHE_DISK_CLEANUP_INTERVAL = int(os.environ.get("CARD_CACHE_DISK_CLEANUP_INTERVAL", 60 * 60 * 4)) +CARD_CACHE_REGULAR_CLEANUP_INTERVAL = int( + os.environ.get("CARD_CACHE_PROCESS_CLEANUP_INTERVAL", 60 * 60 * 24 * 5) +) # 5 Days class CacheStore(object): @@ -123,18 +125,15 @@ def __init__(self, event_emitter): self.cache_manager = CardCacheManager() async def start_cache(self): - self._cleanup_coroutine = asyncio.create_task( - self.cache_manager.start_process_cleanup_routine(120) - ) - self._disk_cleanup_coroutine = asyncio.create_task( - self.cache_manager.cleanup_disk_routine(CARD_CACHE_DISK_CLEANUP_INTERVAL) + self._regualar_cleanup_routine = asyncio.create_task( + self.cache_manager.regular_cleanup_routine( + CARD_CACHE_REGULAR_CLEANUP_INTERVAL + ) ) async def stop_cache(self): - self._cleanup_coroutine.cancel() - await self._cleanup_coroutine - self._disk_cleanup_coroutine.cancel() - await self._disk_cleanup_coroutine + self._regualar_cleanup_routine.cancel() + await self._regualar_cleanup_routine class ArtifactCacheStore(object):