Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ofscraper/commands/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from ofscraper.commands.metadata.desc import desc
from ofscraper.data.posts.scrape_paid import scrape_paid_all
from ofscraper.managers.postcollection import PostCollection
from ofscraper.utils.gather import gather_and_raise

log = logging.getLogger("shared")

Expand Down Expand Up @@ -248,7 +249,7 @@ async def process_dicts(username, model_id, medialist):
asyncio.create_task(consumer(aws, task1, medialist, lock))
for _ in range(concurrency_limit)
]
await asyncio.gather(*consumers)
await gather_and_raise(consumers)
except Exception as E:
with exit.DelayedKeyboardInterrupt():
raise E
Expand Down
3 changes: 2 additions & 1 deletion ofscraper/commands/scraper/actions/download/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ofscraper.commands.scraper.actions.download.utils.text import textDownloader
import ofscraper.utils.settings as settings
import ofscraper.managers.manager as manager
from ofscraper.utils.gather import gather_and_raise


async def downloader(username=None, model_id=None, posts=None, media=None, **kwargs):
Expand Down Expand Up @@ -104,7 +105,7 @@ async def process_dicts(username, model_id, medialist, posts):
asyncio.create_task(consumer(aws, task1, medialist, lock))
for _ in range(concurrency_limit)
]
await asyncio.gather(*consumers)
await gather_and_raise(consumers)
except Exception as E:
with exit.DelayedKeyboardInterrupt():
raise E
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
class AltDownloadManager(DownloadManager):

async def alt_download(self, c, ele: Media, username, model_id):
# Acquire semaphore at the very beginning of the process
await common_globals.sem.acquire()
try:
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots before: {common_globals.sem._value}")
async with common_globals.sem:
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after acquire: {common_globals.sem._value}")
common_globals.log.debug(
f"{common_logs.get_medialog(ele)} Downloading with protected media downloader"
)
Expand Down Expand Up @@ -90,8 +90,6 @@ async def alt_download(self, c, ele: Media, username, model_id):
return await self._handle_result_alt(
sharedPlaceholderObj, ele, audio, video, username, model_id
)
finally:
common_globals.sem.release()

async def _alt_download_downloader(self, item, c, ele):
self._downloadspace()
Expand Down Expand Up @@ -237,40 +235,32 @@ async def _download_fileobject_writer_reader(self, ele, total, res, placeholderO
task1 = await self._add_download_job_task(
ele, total=total, placeholderObj=placeholderObj
)
fileobject = await aiofiles.open(placeholderObj.tempfilepath, "ab").__aenter__()
try:
await fileobject.write(await res.read_())
except Exception as E:
raise E
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file: {placeholderObj.tempfilepath}")
async with aiofiles.open(placeholderObj.tempfilepath, "ab") as fileobject:
await fileobject.write(await res.read_())
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed: {placeholderObj.tempfilepath}")
finally:
try:
await fileobject.close()
except Exception as E:
raise E
try:
await self._remove_download_job_task(task1, ele)
except Exception as E:
raise E
await self._remove_download_job_task(task1, ele)

async def _download_fileobject_writer_streamer(
self, ele, total, res, placeholderObj
):
task1 = await self._add_download_job_task(ele, total, placeholderObj)
fileobject = None
try:
fileobject = await aiofiles.open(
placeholderObj.tempfilepath, "ab"
).__aenter__()
chunk_iter = res.iter_chunked(get_chunk_size())
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file for streaming: {placeholderObj.tempfilepath}")
async with aiofiles.open(placeholderObj.tempfilepath, "ab") as fileobject:
chunk_iter = res.iter_chunked(get_chunk_size())

while True:
try:
chunk = await chunk_iter.__anext__()
await fileobject.write(chunk)
send_chunk_msg(ele, total, placeholderObj)
except StopAsyncIteration:
break # Exit loop when no more chunks
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed after streaming: {placeholderObj.tempfilepath}")

while True:
try:
chunk = await chunk_iter.__anext__()
await fileobject.write(chunk)
send_chunk_msg(ele, total, placeholderObj)
except StopAsyncIteration:
break # Exit loop when no more chunks

# Catch native aiohttp socket read timeouts
except (asyncio.TimeoutError, aiohttp.ServerTimeoutError) as E:
common_globals.log.info(
Expand All @@ -281,19 +271,7 @@ async def _download_fileobject_writer_streamer(
common_globals.log.info(f"An error occurred during download for {ele}: {E}")
raise E
finally:
if fileobject:
try:
await fileobject.close()
except Exception as E:
common_globals.log.debug(f"Error closing file for {ele}: {E}")
raise E
try:
await self._remove_download_job_task(task1, ele)
except Exception as E:
common_globals.log.debug(
f"Error removing download job task for {ele}: {E}"
)
raise E
await self._remove_download_job_task(task1, ele)

async def _handle_result_alt(
self, sharedPlaceholderObj, ele, audio, video, username, model_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@
class MainDownloadManager(DownloadManager):

async def main_download(self, c, ele: Media, username, model_id):
await common_globals.sem.acquire()
try:
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots before: {common_globals.sem._value}")
async with common_globals.sem:
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after acquire: {common_globals.sem._value}")
common_globals.log.debug(
f"{common_logs.get_medialog(ele)} Downloading with normal downloader"
)
Expand All @@ -64,9 +65,7 @@ async def main_download(self, c, ele: Media, username, model_id):
return ele.mediatype.capitalize(), 0

return await self._handle_result_main(result, ele, username, model_id)

finally:
common_globals.sem.release()
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Semaphore slots after release: {common_globals.sem._value}")

async def _main_download_downloader(self, c, ele):
self._downloadspace()
Expand Down Expand Up @@ -214,42 +213,34 @@ async def _download_fileobject_writer_reader(
ele, total=total, tempholderObj=tempholderObj, placeholderObj=placeholderObj
)

fileobject = await aiofiles.open(tempholderObj.tempfilepath, "ab").__aenter__()
try:
await fileobject.write(await r.read_())
except Exception as E:
raise E
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file: {tempholderObj.tempfilepath}")
async with aiofiles.open(tempholderObj.tempfilepath, "ab") as fileobject:
await fileobject.write(await r.read_())
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed: {tempholderObj.tempfilepath}")
finally:
try:
await fileobject.close()
except Exception:
pass
try:
await self._remove_download_job_task(task1, ele)
except Exception:
pass
await self._remove_download_job_task(task1, ele)

async def _download_fileobject_writer_streamer(
self, res, ele, tempholderObj, placeholderObj, total
):
task1 = await self._add_download_job_task(
ele, total=total, tempholderObj=tempholderObj, placeholderObj=placeholderObj
)
fileobject = None
try:
fileobject = await aiofiles.open(
tempholderObj.tempfilepath, "ab"
).__aenter__()
chunk_iter = res.iter_chunked(get_chunk_size())

while True:
try:
chunk = await chunk_iter.__anext__()
await fileobject.write(chunk)
send_chunk_msg(ele, total, tempholderObj)
except StopAsyncIteration:
break
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] Opening file for streaming: {tempholderObj.tempfilepath}")
async with aiofiles.open(tempholderObj.tempfilepath, "ab") as fileobject:
chunk_iter = res.iter_chunked(get_chunk_size())

while True:
try:
chunk = await chunk_iter.__anext__()
await fileobject.write(chunk)
send_chunk_msg(ele, total, tempholderObj)
except StopAsyncIteration:
break # Exit loop when no more chunks
common_globals.log.trace(f"{get_medialog(ele)} [RESOURCE] File closed after streaming: {tempholderObj.tempfilepath}")

# Catch native aiohttp socket read timeouts
except (asyncio.TimeoutError, aiohttp.ServerTimeoutError) as E:
common_globals.log.info(
Expand All @@ -260,19 +251,7 @@ async def _download_fileobject_writer_streamer(
common_globals.log.info(f"An error occurred during download for {ele}: {E}")
raise E
finally:
if fileobject:
try:
await fileobject.close()
except Exception as E:
common_globals.log.debug(f"Error closing file for {ele}: {E}")
raise E
try:
await self._remove_download_job_task(task1, ele)
except Exception as E:
common_globals.log.debug(
f"Error removing download job task for {ele}: {E}"
)
raise E
await self._remove_download_job_task(task1, ele)

async def _handle_result_main(self, result, ele, username, model_id):
total, temp, placeholderObj = result
Expand Down
9 changes: 9 additions & 0 deletions ofscraper/commands/scraper/actions/utils/globals.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import contextvars
import os
from concurrent.futures import ThreadPoolExecutor

import aioprocessing
import psutil
import ofscraper.utils.console as console_
import ofscraper.utils.settings as settings
import ofscraper.utils.logs.logger as logger
Expand Down Expand Up @@ -63,6 +65,13 @@ def main_globals():
global log
log = logger.get_shared_logger(name="ofscraper_download")

try:
process = psutil.Process(os.getpid())
num_fds = len(process.open_files())
log.trace(f"[RESOURCE] Download globals initialized | Open file descriptors: {num_fds}")
except Exception:
pass


def mainProcessVariableInit():
set_up_contexvars()
Expand Down
8 changes: 6 additions & 2 deletions ofscraper/commands/scraper/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,23 +231,27 @@ def daemon_process():
try:
with exit.DelayedKeyboardInterrupt():
schedule.clear()
if worker_thread:
if worker_thread and worker_thread.is_alive():
worker_thread.join(timeout=5)
raise KeyboardInterrupt
except KeyboardInterrupt:
with exit.DelayedKeyboardInterrupt():
schedule.clear()
if worker_thread and worker_thread.is_alive():
worker_thread.join(timeout=5)
raise E
except Exception as E:
try:
with exit.DelayedKeyboardInterrupt():
schedule.clear()
if worker_thread:
if worker_thread and worker_thread.is_alive():
worker_thread.join(timeout=5)
raise E
except KeyboardInterrupt:
with exit.DelayedKeyboardInterrupt():
schedule.clear()
if worker_thread and worker_thread.is_alive():
worker_thread.join(timeout=5)
raise E


Expand Down
14 changes: 6 additions & 8 deletions ofscraper/data/posts/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,14 +494,12 @@ async def process_areas(ele, model_id, username, c=None):
def process_single_task(func):
async def inner(sem=None):
data = None
await sem.acquire()
try:
data = await func()
except Exception as E:
log.traceback_(E)
log.traceback_(traceback.format_exc())
finally:
sem.release()
async with sem:
try:
data = await func()
except Exception as E:
log.traceback_(E)
log.traceback_(traceback.format_exc())
return data

return inner
Expand Down
27 changes: 27 additions & 0 deletions ofscraper/main/close/exit.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import time
import os
import logging

import psutil
import ofscraper.utils.logs.close as close_log
import ofscraper.utils.manager as manager
import ofscraper.utils.cache.cache as cache

log = logging.getLogger("shared")


def shutdown():
time.sleep(3)
close_log.gracefulClose()
manager.shutdown()
closeThreadExecutor()
closeCache()
closeThreadExecutor()
logResourceCleanup()


def forcedShutDown():
time.sleep(3)
manager.shutdown()
closeThreadExecutor()
closeCache()
closeThreadExecutor()


def closeThreadExecutor():
Expand All @@ -37,3 +46,21 @@ def closeCache():
cache.close()
except Exception:
pass


def logResourceCleanup():
"""Log resource state at shutdown for leak detection."""
try:
process = psutil.Process(os.getpid())
num_fds = len(process.open_files())
num_threads = process.num_threads()

log.trace(f"[RESOURCE] Shutdown verification | Open file descriptors: {num_fds} | Threads: {num_threads}")

if num_fds > 50:
log.warning(f"[RESOURCE] ⚠️ Possible file descriptor leak: {num_fds} files still open at shutdown")
open_files = process.open_files()
sample = [f.path for f in open_files[:10]]
log.warning(f"[RESOURCE] Sample of open files: {sample}")
except Exception:
pass
9 changes: 1 addition & 8 deletions ofscraper/managers/sessionmanager/sessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,7 @@ async def requests_async(
),
):
with _:
await self._sem.acquire()
try:
async with self._sem:
if await self._rate_limit_sleeper.async_do_sleep():
pass
else:
Expand Down Expand Up @@ -681,13 +680,7 @@ async def requests_async(
raise SystemExit("OnlyFans Maintenance detected.")
r.raise_for_status()

self._sem.release()
yield r
return
except Exception as E:
await self._async_handle_error(E, exceptions)
self._sem.release()
raise E

@property
def sleep(self):
Expand Down
Loading