From 19b87f011d7001d23e71fe1c5bdcba06b8eedc4d Mon Sep 17 00:00:00 2001 From: thismanyboyfriends2 Date: Fri, 2 Jan 2026 23:30:32 +0000 Subject: [PATCH 1/5] fix: memory leaks and misuse of semaphores --- ofscraper/commands/metadata/metadata.py | 16 +++++++++++++++- .../scraper/actions/download/download.py | 16 +++++++++++++++- ofscraper/commands/scraper/scraper.py | 8 ++++++-- ofscraper/main/close/exit.py | 2 ++ .../managers/sessionmanager/sessionmanager.py | 5 ++--- ofscraper/utils/context/run_async.py | 13 +++++-------- 6 files changed, 45 insertions(+), 15 deletions(-) diff --git a/ofscraper/commands/metadata/metadata.py b/ofscraper/commands/metadata/metadata.py index 7ab628ea9..f2eda334f 100755 --- a/ofscraper/commands/metadata/metadata.py +++ b/ofscraper/commands/metadata/metadata.py @@ -46,6 +46,17 @@ log = logging.getLogger("shared") +def _handle_consumer_exception(task): + """Handle exceptions from consumer tasks.""" + try: + if task.exception(): + logging.getLogger("shared").debug( + f"Consumer task failed with exception: {task.exception()}" + ) + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + class MetadataCommandManager(CommandManager): def __init__(self): super().__init__() @@ -248,7 +259,10 @@ async def process_dicts(username, model_id, medialist): asyncio.create_task(consumer(aws, task1, medialist, lock)) for _ in range(concurrency_limit) ] - await asyncio.gather(*consumers) + # Add exception callback to detect any unhandled exceptions + for task in consumers: + task.add_done_callback(_handle_consumer_exception) + await asyncio.gather(*consumers, return_exceptions=True) except Exception as E: with exit.DelayedKeyboardInterrupt(): raise E diff --git a/ofscraper/commands/scraper/actions/download/download.py b/ofscraper/commands/scraper/actions/download/download.py index fcc618b2d..b6da30023 100755 --- a/ofscraper/commands/scraper/actions/download/download.py +++ b/ofscraper/commands/scraper/actions/download/download.py @@ -29,6 +29,17 @@ import ofscraper.managers.manager as manager +def _handle_consumer_exception(task): + """Handle exceptions from consumer tasks.""" + try: + if task.exception(): + logging.getLogger("shared").debug( + f"Consumer task failed with exception: {task.exception()}" + ) + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + async def downloader(username=None, model_id=None, posts=None, media=None, **kwargs): download_str = download_activity_str.format(username=username) path_str = format_safe( @@ -104,7 +115,10 @@ async def process_dicts(username, model_id, medialist, posts): asyncio.create_task(consumer(aws, task1, medialist, lock)) for _ in range(concurrency_limit) ] - await asyncio.gather(*consumers) + # Add exception callback to detect any unhandled exceptions + for task in consumers: + task.add_done_callback(_handle_consumer_exception) + await asyncio.gather(*consumers, return_exceptions=True) except Exception as E: with exit.DelayedKeyboardInterrupt(): raise E diff --git a/ofscraper/commands/scraper/scraper.py b/ofscraper/commands/scraper/scraper.py index d558befd5..15d2f8be7 100755 --- a/ofscraper/commands/scraper/scraper.py +++ b/ofscraper/commands/scraper/scraper.py @@ -231,23 +231,27 @@ def daemon_process(): try: with exit.DelayedKeyboardInterrupt(): schedule.clear() - if worker_thread: + if worker_thread and worker_thread.is_alive(): worker_thread.join(timeout=5) raise KeyboardInterrupt except KeyboardInterrupt: with exit.DelayedKeyboardInterrupt(): schedule.clear() + if worker_thread and worker_thread.is_alive(): + worker_thread.join(timeout=5) raise E except Exception as E: try: with exit.DelayedKeyboardInterrupt(): schedule.clear() - if worker_thread: + if worker_thread and worker_thread.is_alive(): worker_thread.join(timeout=5) raise E except KeyboardInterrupt: with exit.DelayedKeyboardInterrupt(): schedule.clear() + if worker_thread and worker_thread.is_alive(): + worker_thread.join(timeout=5) raise E diff --git a/ofscraper/main/close/exit.py b/ofscraper/main/close/exit.py index 58c7b6a2a..047e7c035 100755 --- a/ofscraper/main/close/exit.py +++ b/ofscraper/main/close/exit.py @@ -10,6 +10,7 @@ def shutdown(): manager.shutdown() closeThreadExecutor() closeCache() + closeThreadExecutor() def forcedShutDown(): @@ -17,6 +18,7 @@ def forcedShutDown(): manager.shutdown() closeThreadExecutor() closeCache() + closeThreadExecutor() def closeThreadExecutor(): diff --git a/ofscraper/managers/sessionmanager/sessionmanager.py b/ofscraper/managers/sessionmanager/sessionmanager.py index 1739e7b17..d02530ea2 100755 --- a/ofscraper/managers/sessionmanager/sessionmanager.py +++ b/ofscraper/managers/sessionmanager/sessionmanager.py @@ -681,13 +681,12 @@ async def requests_async( raise SystemExit("OnlyFans Maintenance detected.") r.raise_for_status() - self._sem.release() yield r - return except Exception as E: await self._async_handle_error(E, exceptions) - self._sem.release() raise E + finally: + self._sem.release() @property def sleep(self): diff --git a/ofscraper/utils/context/run_async.py b/ofscraper/utils/context/run_async.py index 63565261b..4eb2179dd 100755 --- a/ofscraper/utils/context/run_async.py +++ b/ofscraper/utils/context/run_async.py @@ -2,11 +2,6 @@ import ofscraper.utils.context.exit as exit -# utils/context/run_async.py - -import asyncio -import ofscraper.utils.context.exit as exit - def run(coro): def inner(*args, **kwargs): @@ -57,15 +52,17 @@ def inner(*args, **kwargs): loop.run_forever() tasks.exception() except Exception: - None + pass raise E except Exception as E: raise E finally: try: loop.close() - except: - None + except RuntimeError: + pass # Loop already closed + except Exception: + pass # Ignore other exceptions during cleanup asyncio.set_event_loop(None) return coro(*args, **kwargs) From bf80e69e5b12331166f76502fc205b5a91ed55c3 Mon Sep 17 00:00:00 2001 From: thismanyboyfriends2 Date: Mon, 5 Jan 2026 16:52:54 +0000 Subject: [PATCH 2/5] fix: use context managers for semaphore and lock operations --- .../actions/download/managers/alt_download.py | 6 +--- .../download/managers/main_download.py | 6 +--- ofscraper/data/posts/post.py | 14 ++++----- .../managers/sessionmanager/sessionmanager.py | 8 +---- ofscraper/utils/cache/cache.py | 29 +++---------------- 5 files changed, 13 insertions(+), 50 deletions(-) diff --git a/ofscraper/commands/scraper/actions/download/managers/alt_download.py b/ofscraper/commands/scraper/actions/download/managers/alt_download.py index 00ea973f7..704719ebe 100755 --- a/ofscraper/commands/scraper/actions/download/managers/alt_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/alt_download.py @@ -52,9 +52,7 @@ class AltDownloadManager(DownloadManager): async def alt_download(self, c, ele: Media, username, model_id): - # Acquire semaphore at the very beginning of the process - await common_globals.sem.acquire() - try: + async with common_globals.sem: common_globals.log.debug( f"{common_logs.get_medialog(ele)} Downloading with protected media downloader" ) @@ -90,8 +88,6 @@ async def alt_download(self, c, ele: Media, username, model_id): return await self._handle_result_alt( sharedPlaceholderObj, ele, audio, video, username, model_id ) - finally: - common_globals.sem.release() async def _alt_download_downloader(self, item, c, ele): self._downloadspace() diff --git a/ofscraper/commands/scraper/actions/download/managers/main_download.py b/ofscraper/commands/scraper/actions/download/managers/main_download.py index fb052efa8..b9e5ba04a 100755 --- a/ofscraper/commands/scraper/actions/download/managers/main_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/main_download.py @@ -40,8 +40,7 @@ class MainDownloadManager(DownloadManager): async def main_download(self, c, ele: Media, username, model_id): - await common_globals.sem.acquire() - try: + async with common_globals.sem: common_globals.log.debug( f"{common_logs.get_medialog(ele)} Downloading with normal downloader" ) @@ -65,9 +64,6 @@ async def main_download(self, c, ele: Media, username, model_id): return await self._handle_result_main(result, ele, username, model_id) - finally: - common_globals.sem.release() - async def _main_download_downloader(self, c, ele): self._downloadspace() tempholderObj = await placeholder.tempFilePlaceholder( diff --git a/ofscraper/data/posts/post.py b/ofscraper/data/posts/post.py index 9f2051f9b..31097ab40 100755 --- a/ofscraper/data/posts/post.py +++ b/ofscraper/data/posts/post.py @@ -494,14 +494,12 @@ async def process_areas(ele, model_id, username, c=None): def process_single_task(func): async def inner(sem=None): data = None - await sem.acquire() - try: - data = await func() - except Exception as E: - log.traceback_(E) - log.traceback_(traceback.format_exc()) - finally: - sem.release() + async with sem: + try: + data = await func() + except Exception as E: + log.traceback_(E) + log.traceback_(traceback.format_exc()) return data return inner diff --git a/ofscraper/managers/sessionmanager/sessionmanager.py b/ofscraper/managers/sessionmanager/sessionmanager.py index d02530ea2..08c61bb7e 100755 --- a/ofscraper/managers/sessionmanager/sessionmanager.py +++ b/ofscraper/managers/sessionmanager/sessionmanager.py @@ -635,8 +635,7 @@ async def requests_async( ), ): with _: - await self._sem.acquire() - try: + async with self._sem: if await self._rate_limit_sleeper.async_do_sleep(): pass else: @@ -682,11 +681,6 @@ async def requests_async( r.raise_for_status() yield r - except Exception as E: - await self._async_handle_error(E, exceptions) - raise E - finally: - self._sem.release() @property def sleep(self): diff --git a/ofscraper/utils/cache/cache.py b/ofscraper/utils/cache/cache.py index 3418f4b2f..320490746 100755 --- a/ofscraper/utils/cache/cache.py +++ b/ofscraper/utils/cache/cache.py @@ -12,66 +12,45 @@ def get(*args, **kwargs): global lock - lock.acquire() - try: + with lock: if settings.get_settings().cached_disabled: return kwargs.get("default") global cache if cache is None: cache = Cache(common_paths.getcachepath(), disk=data.get_cache_mode()) return cache.get(*args, **kwargs) - except Exception as E: - raise E - finally: - lock.release() def set(*args, auto_close=True, **kwargs): global lock - lock.acquire() - try: + with lock: if settings.get_settings().cached_disabled: return global cache if cache is None: cache = Cache(common_paths.getcachepath(), disk=data.get_cache_mode()) cache.set(*args, **kwargs) - except Exception as E: - raise E - finally: - lock.release() if auto_close: close() def close(*args, **kwargs): global lock - lock.acquire() - try: + with lock: if settings.get_settings().cached_disabled: return None global cache if cache is None: cache = Cache(common_paths.getcachepath(), disk=data.get_cache_mode()) cache.close(*args, **kwargs) - except Exception as E: - raise E - finally: - lock.release() def touch(*args, **kwargs): global lock - lock.acquire() - try: + with lock: if settings.get_settings().cached_disabled: return None global cache if cache is None: cache = Cache(common_paths.getcachepath(), disk=data.get_cache_mode()) cache.touch(*args, **kwargs) - - except Exception as E: - raise E - finally: - lock.release() From af9459ed72619fe03cfbf679e1b1133973b418d3 Mon Sep 17 00:00:00 2001 From: thismanyboyfriends2 Date: Mon, 5 Jan 2026 17:07:25 +0000 Subject: [PATCH 3/5] fix: use context managers for async file operations --- .../actions/download/managers/alt_download.py | 52 +++++------------- .../download/managers/main_download.py | 54 ++++++------------- 2 files changed, 29 insertions(+), 77 deletions(-) diff --git a/ofscraper/commands/scraper/actions/download/managers/alt_download.py b/ofscraper/commands/scraper/actions/download/managers/alt_download.py index 704719ebe..36b7cf8b1 100755 --- a/ofscraper/commands/scraper/actions/download/managers/alt_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/alt_download.py @@ -233,40 +233,28 @@ async def _download_fileobject_writer_reader(self, ele, total, res, placeholderO task1 = await self._add_download_job_task( ele, total=total, placeholderObj=placeholderObj ) - fileobject = await aiofiles.open(placeholderObj.tempfilepath, "ab").__aenter__() try: - await fileobject.write(await res.read_()) - except Exception as E: - raise E + async with aiofiles.open(placeholderObj.tempfilepath, "ab") as fileobject: + await fileobject.write(await res.read_()) finally: - try: - await fileobject.close() - except Exception as E: - raise E - try: - await self._remove_download_job_task(task1, ele) - except Exception as E: - raise E + await self._remove_download_job_task(task1, ele) async def _download_fileobject_writer_streamer( self, ele, total, res, placeholderObj ): task1 = await self._add_download_job_task(ele, total, placeholderObj) - fileobject = None try: - fileobject = await aiofiles.open( - placeholderObj.tempfilepath, "ab" - ).__aenter__() - chunk_iter = res.iter_chunked(get_chunk_size()) + async with aiofiles.open(placeholderObj.tempfilepath, "ab") as fileobject: + chunk_iter = res.iter_chunked(get_chunk_size()) + + while True: + try: + chunk = await chunk_iter.__anext__() + await fileobject.write(chunk) + send_chunk_msg(ele, total, placeholderObj) + except StopAsyncIteration: + break # Exit loop when no more chunks - while True: - try: - chunk = await chunk_iter.__anext__() - await fileobject.write(chunk) - send_chunk_msg(ele, total, placeholderObj) - except StopAsyncIteration: - break # Exit loop when no more chunks - # Catch native aiohttp socket read timeouts except (asyncio.TimeoutError, aiohttp.ServerTimeoutError) as E: common_globals.log.info( @@ -277,19 +265,7 @@ async def _download_fileobject_writer_streamer( common_globals.log.info(f"An error occurred during download for {ele}: {E}") raise E finally: - if fileobject: - try: - await fileobject.close() - except Exception as E: - common_globals.log.debug(f"Error closing file for {ele}: {E}") - raise E - try: - await self._remove_download_job_task(task1, ele) - except Exception as E: - common_globals.log.debug( - f"Error removing download job task for {ele}: {E}" - ) - raise E + await self._remove_download_job_task(task1, ele) async def _handle_result_alt( self, sharedPlaceholderObj, ele, audio, video, username, model_id diff --git a/ofscraper/commands/scraper/actions/download/managers/main_download.py b/ofscraper/commands/scraper/actions/download/managers/main_download.py index b9e5ba04a..30d2a7250 100755 --- a/ofscraper/commands/scraper/actions/download/managers/main_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/main_download.py @@ -210,20 +210,11 @@ async def _download_fileobject_writer_reader( ele, total=total, tempholderObj=tempholderObj, placeholderObj=placeholderObj ) - fileobject = await aiofiles.open(tempholderObj.tempfilepath, "ab").__aenter__() try: - await fileobject.write(await r.read_()) - except Exception as E: - raise E + async with aiofiles.open(tempholderObj.tempfilepath, "ab") as fileobject: + await fileobject.write(await r.read_()) finally: - try: - await fileobject.close() - except Exception: - pass - try: - await self._remove_download_job_task(task1, ele) - except Exception: - pass + await self._remove_download_job_task(task1, ele) async def _download_fileobject_writer_streamer( self, res, ele, tempholderObj, placeholderObj, total @@ -231,21 +222,18 @@ async def _download_fileobject_writer_streamer( task1 = await self._add_download_job_task( ele, total=total, tempholderObj=tempholderObj, placeholderObj=placeholderObj ) - fileobject = None try: - fileobject = await aiofiles.open( - tempholderObj.tempfilepath, "ab" - ).__aenter__() - chunk_iter = res.iter_chunked(get_chunk_size()) - - while True: - try: - chunk = await chunk_iter.__anext__() - await fileobject.write(chunk) - send_chunk_msg(ele, total, tempholderObj) - except StopAsyncIteration: - break - + async with aiofiles.open(tempholderObj.tempfilepath, "ab") as fileobject: + chunk_iter = res.iter_chunked(get_chunk_size()) + + while True: + try: + chunk = await chunk_iter.__anext__() + await fileobject.write(chunk) + send_chunk_msg(ele, total, tempholderObj) + except StopAsyncIteration: + break # Exit loop when no more chunks + # Catch native aiohttp socket read timeouts except (asyncio.TimeoutError, aiohttp.ServerTimeoutError) as E: common_globals.log.info( @@ -256,19 +244,7 @@ async def _download_fileobject_writer_streamer( common_globals.log.info(f"An error occurred during download for {ele}: {E}") raise E finally: - if fileobject: - try: - await fileobject.close() - except Exception as E: - common_globals.log.debug(f"Error closing file for {ele}: {E}") - raise E - try: - await self._remove_download_job_task(task1, ele) - except Exception as E: - common_globals.log.debug( - f"Error removing download job task for {ele}: {E}" - ) - raise E + await self._remove_download_job_task(task1, ele) async def _handle_result_main(self, result, ele, username, model_id): total, temp, placeholderObj = result From 228934e470f63951bdf4dab62cbb986eaca83d6b Mon Sep 17 00:00:00 2001 From: thismanyboyfriends2 Date: Mon, 5 Jan 2026 17:15:56 +0000 Subject: [PATCH 4/5] chore: adding logging to check semaphore and file handler usage in test runs --- .../actions/download/managers/alt_download.py | 7 ++++++ .../download/managers/main_download.py | 7 ++++++ .../commands/scraper/actions/utils/globals.py | 9 +++++++ ofscraper/main/close/exit.py | 25 +++++++++++++++++++ 4 files changed, 48 insertions(+) diff --git a/ofscraper/commands/scraper/actions/download/managers/alt_download.py b/ofscraper/commands/scraper/actions/download/managers/alt_download.py index 36b7cf8b1..e89df65c2 100755 --- a/ofscraper/commands/scraper/actions/download/managers/alt_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/alt_download.py @@ -52,7 +52,9 @@ class AltDownloadManager(DownloadManager): async def alt_download(self, c, ele: Media, username, model_id): + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots before: {common_globals.sem._value}") async with common_globals.sem: + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after acquire: {common_globals.sem._value}") common_globals.log.debug( f"{common_logs.get_medialog(ele)} Downloading with protected media downloader" ) @@ -88,6 +90,7 @@ async def alt_download(self, c, ele: Media, username, model_id): return await self._handle_result_alt( sharedPlaceholderObj, ele, audio, video, username, model_id ) + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after release: {common_globals.sem._value}") async def _alt_download_downloader(self, item, c, ele): self._downloadspace() @@ -234,8 +237,10 @@ async def _download_fileobject_writer_reader(self, ele, total, res, placeholderO ele, total=total, placeholderObj=placeholderObj ) try: + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file: {placeholderObj.tempfilepath}") async with aiofiles.open(placeholderObj.tempfilepath, "ab") as fileobject: await fileobject.write(await res.read_()) + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed: {placeholderObj.tempfilepath}") finally: await self._remove_download_job_task(task1, ele) @@ -244,6 +249,7 @@ async def _download_fileobject_writer_streamer( ): task1 = await self._add_download_job_task(ele, total, placeholderObj) try: + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file for streaming: {placeholderObj.tempfilepath}") async with aiofiles.open(placeholderObj.tempfilepath, "ab") as fileobject: chunk_iter = res.iter_chunked(get_chunk_size()) @@ -254,6 +260,7 @@ async def _download_fileobject_writer_streamer( send_chunk_msg(ele, total, placeholderObj) except StopAsyncIteration: break # Exit loop when no more chunks + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed after streaming: {placeholderObj.tempfilepath}") # Catch native aiohttp socket read timeouts except (asyncio.TimeoutError, aiohttp.ServerTimeoutError) as E: diff --git a/ofscraper/commands/scraper/actions/download/managers/main_download.py b/ofscraper/commands/scraper/actions/download/managers/main_download.py index 30d2a7250..1d8b1647c 100755 --- a/ofscraper/commands/scraper/actions/download/managers/main_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/main_download.py @@ -40,7 +40,9 @@ class MainDownloadManager(DownloadManager): async def main_download(self, c, ele: Media, username, model_id): + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots before: {common_globals.sem._value}") async with common_globals.sem: + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after acquire: {common_globals.sem._value}") common_globals.log.debug( f"{common_logs.get_medialog(ele)} Downloading with normal downloader" ) @@ -63,6 +65,7 @@ async def main_download(self, c, ele: Media, username, model_id): return ele.mediatype.capitalize(), 0 return await self._handle_result_main(result, ele, username, model_id) + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after release: {common_globals.sem._value}") async def _main_download_downloader(self, c, ele): self._downloadspace() @@ -211,8 +214,10 @@ async def _download_fileobject_writer_reader( ) try: + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file: {tempholderObj.tempfilepath}") async with aiofiles.open(tempholderObj.tempfilepath, "ab") as fileobject: await fileobject.write(await r.read_()) + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed: {tempholderObj.tempfilepath}") finally: await self._remove_download_job_task(task1, ele) @@ -223,6 +228,7 @@ async def _download_fileobject_writer_streamer( ele, total=total, tempholderObj=tempholderObj, placeholderObj=placeholderObj ) try: + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file for streaming: {tempholderObj.tempfilepath}") async with aiofiles.open(tempholderObj.tempfilepath, "ab") as fileobject: chunk_iter = res.iter_chunked(get_chunk_size()) @@ -233,6 +239,7 @@ async def _download_fileobject_writer_streamer( send_chunk_msg(ele, total, tempholderObj) except StopAsyncIteration: break # Exit loop when no more chunks + common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed after streaming: {tempholderObj.tempfilepath}") # Catch native aiohttp socket read timeouts except (asyncio.TimeoutError, aiohttp.ServerTimeoutError) as E: diff --git a/ofscraper/commands/scraper/actions/utils/globals.py b/ofscraper/commands/scraper/actions/utils/globals.py index eacae75aa..0e3dcfb78 100755 --- a/ofscraper/commands/scraper/actions/utils/globals.py +++ b/ofscraper/commands/scraper/actions/utils/globals.py @@ -1,8 +1,10 @@ import asyncio import contextvars +import os from concurrent.futures import ThreadPoolExecutor import aioprocessing +import psutil import ofscraper.utils.console as console_ import ofscraper.utils.settings as settings import ofscraper.utils.logs.logger as logger @@ -63,6 +65,13 @@ def main_globals(): global log log = logger.get_shared_logger(name="ofscraper_download") + try: + process = psutil.Process(os.getpid()) + num_fds = len(process.open_files()) + log.trace(f"[RESOURCE] Download globals initialized | Open file descriptors: {num_fds}") + except Exception: + pass + def mainProcessVariableInit(): set_up_contexvars() diff --git a/ofscraper/main/close/exit.py b/ofscraper/main/close/exit.py index 047e7c035..c890d51e0 100755 --- a/ofscraper/main/close/exit.py +++ b/ofscraper/main/close/exit.py @@ -1,8 +1,14 @@ import time +import os +import logging + +import psutil import ofscraper.utils.logs.close as close_log import ofscraper.utils.manager as manager import ofscraper.utils.cache.cache as cache +log = logging.getLogger("shared") + def shutdown(): time.sleep(3) @@ -11,6 +17,7 @@ def shutdown(): closeThreadExecutor() closeCache() closeThreadExecutor() + logResourceCleanup() def forcedShutDown(): @@ -39,3 +46,21 @@ def closeCache(): cache.close() except Exception: pass + + +def logResourceCleanup(): + """Log resource state at shutdown for leak detection.""" + try: + process = psutil.Process(os.getpid()) + num_fds = len(process.open_files()) + num_threads = process.num_threads() + + log.trace(f"[RESOURCE] Shutdown verification | Open file descriptors: {num_fds} | Threads: {num_threads}") + + if num_fds > 50: + log.warning(f"[RESOURCE] ⚠️ Possible file descriptor leak: {num_fds} files still open at shutdown") + open_files = process.open_files() + sample = [f.path for f in open_files[:10]] + log.warning(f"[RESOURCE] Sample of open files: {sample}") + except Exception: + pass From 133ba17cd39550309bf1ad3f03c12f8c647292ca Mon Sep 17 00:00:00 2001 From: thismanyboyfriends2 Date: Tue, 10 Mar 2026 23:49:20 +0000 Subject: [PATCH 5/5] fix: gather exceptions so they don't get swallowed --- ofscraper/commands/metadata/metadata.py | 17 +--- .../scraper/actions/download/download.py | 17 +--- .../actions/download/managers/alt_download.py | 1 - ofscraper/utils/gather.py | 13 +++ test/conftest.py | 5 ++ test/general/gather_exception_test.py | 81 +++++++++++++++++++ 6 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 ofscraper/utils/gather.py create mode 100644 test/conftest.py create mode 100644 test/general/gather_exception_test.py diff --git a/ofscraper/commands/metadata/metadata.py b/ofscraper/commands/metadata/metadata.py index f2eda334f..530882581 100755 --- a/ofscraper/commands/metadata/metadata.py +++ b/ofscraper/commands/metadata/metadata.py @@ -42,21 +42,11 @@ from ofscraper.commands.metadata.desc import desc from ofscraper.data.posts.scrape_paid import scrape_paid_all from ofscraper.managers.postcollection import PostCollection +from ofscraper.utils.gather import gather_and_raise log = logging.getLogger("shared") -def _handle_consumer_exception(task): - """Handle exceptions from consumer tasks.""" - try: - if task.exception(): - logging.getLogger("shared").debug( - f"Consumer task failed with exception: {task.exception()}" - ) - except (asyncio.CancelledError, asyncio.InvalidStateError): - pass - - class MetadataCommandManager(CommandManager): def __init__(self): super().__init__() @@ -259,10 +249,7 @@ async def process_dicts(username, model_id, medialist): asyncio.create_task(consumer(aws, task1, medialist, lock)) for _ in range(concurrency_limit) ] - # Add exception callback to detect any unhandled exceptions - for task in consumers: - task.add_done_callback(_handle_consumer_exception) - await asyncio.gather(*consumers, return_exceptions=True) + await gather_and_raise(consumers) except Exception as E: with exit.DelayedKeyboardInterrupt(): raise E diff --git a/ofscraper/commands/scraper/actions/download/download.py b/ofscraper/commands/scraper/actions/download/download.py index b6da30023..be3ceb414 100755 --- a/ofscraper/commands/scraper/actions/download/download.py +++ b/ofscraper/commands/scraper/actions/download/download.py @@ -27,17 +27,7 @@ from ofscraper.commands.scraper.actions.download.utils.text import textDownloader import ofscraper.utils.settings as settings import ofscraper.managers.manager as manager - - -def _handle_consumer_exception(task): - """Handle exceptions from consumer tasks.""" - try: - if task.exception(): - logging.getLogger("shared").debug( - f"Consumer task failed with exception: {task.exception()}" - ) - except (asyncio.CancelledError, asyncio.InvalidStateError): - pass +from ofscraper.utils.gather import gather_and_raise async def downloader(username=None, model_id=None, posts=None, media=None, **kwargs): @@ -115,10 +105,7 @@ async def process_dicts(username, model_id, medialist, posts): asyncio.create_task(consumer(aws, task1, medialist, lock)) for _ in range(concurrency_limit) ] - # Add exception callback to detect any unhandled exceptions - for task in consumers: - task.add_done_callback(_handle_consumer_exception) - await asyncio.gather(*consumers, return_exceptions=True) + await gather_and_raise(consumers) except Exception as E: with exit.DelayedKeyboardInterrupt(): raise E diff --git a/ofscraper/commands/scraper/actions/download/managers/alt_download.py b/ofscraper/commands/scraper/actions/download/managers/alt_download.py index e89df65c2..00b4a53f1 100755 --- a/ofscraper/commands/scraper/actions/download/managers/alt_download.py +++ b/ofscraper/commands/scraper/actions/download/managers/alt_download.py @@ -90,7 +90,6 @@ async def alt_download(self, c, ele: Media, username, model_id): return await self._handle_result_alt( sharedPlaceholderObj, ele, audio, video, username, model_id ) - common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after release: {common_globals.sem._value}") async def _alt_download_downloader(self, item, c, ele): self._downloadspace() diff --git a/ofscraper/utils/gather.py b/ofscraper/utils/gather.py new file mode 100644 index 000000000..ebb8e2b1a --- /dev/null +++ b/ofscraper/utils/gather.py @@ -0,0 +1,13 @@ +import asyncio + + +async def gather_and_raise(tasks): + """Await all tasks and re-raise the first exception, if any. + + Uses return_exceptions=True so all tasks run to completion before + raising, rather than cancelling sibling tasks on first failure. + """ + results = await asyncio.gather(*tasks, return_exceptions=True) + exc = next((r for r in results if isinstance(r, BaseException)), None) + if exc: + raise exc diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 000000000..6caf64d85 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,5 @@ +import sys +import pathlib + +# Add project root to path so tests can import ofscraper +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) diff --git a/test/general/gather_exception_test.py b/test/general/gather_exception_test.py new file mode 100644 index 000000000..6c53c8a35 --- /dev/null +++ b/test/general/gather_exception_test.py @@ -0,0 +1,81 @@ +""" +Regression tests for asyncio.gather exception propagation. + +Previously, gather was called with return_exceptions=True but the result +was never checked, causing all consumer task exceptions to be silently swallowed. +""" +import asyncio +import pytest + +from ofscraper.utils.gather import gather_and_raise + + +def test_gather_and_raise_propagates_first_exception(): + """Consumer exceptions must propagate, not be silently swallowed.""" + async def run(): + async def failing(): + raise RuntimeError("consumer failed") + + tasks = [asyncio.create_task(failing())] + await gather_and_raise(tasks) + + with pytest.raises(RuntimeError, match="consumer failed"): + asyncio.run(run()) + + +def test_gather_and_raise_all_consumers_run_before_raise(): + """All consumers should complete before the exception is raised.""" + completed = [] + + async def run(): + async def failing(): + raise RuntimeError("fail") + + async def slow_succeeding(): + await asyncio.sleep(0) + completed.append("done") + + tasks = [ + asyncio.create_task(failing()), + asyncio.create_task(slow_succeeding()), + ] + await gather_and_raise(tasks) + + with pytest.raises(RuntimeError): + asyncio.run(run()) + + assert "done" in completed, "slow consumer should have completed before exception raised" + + +def test_gather_and_raise_does_not_raise_when_all_succeed(): + """No exception raised when all consumers succeed.""" + results = [] + + async def run(): + async def succeeding(): + results.append("ok") + + tasks = [asyncio.create_task(succeeding()) for _ in range(3)] + await gather_and_raise(tasks) + + asyncio.run(run()) + assert results == ["ok", "ok", "ok"] + + +def test_gather_and_raise_multiple_exceptions_raises_first(): + """When multiple consumers fail, the first exception is raised.""" + async def run(): + async def failing_a(): + raise ValueError("error A") + + async def failing_b(): + raise TypeError("error B") + + tasks = [ + asyncio.create_task(failing_a()), + asyncio.create_task(failing_b()), + ] + await gather_and_raise(tasks) + + with pytest.raises((ValueError, TypeError)): + asyncio.run(run())