From e1faf8f8acb6288fe148983ff44d1e669ebfc460 Mon Sep 17 00:00:00 2001 From: Ch3ri0ur Date: Sun, 10 Aug 2025 17:47:06 +0200 Subject: [PATCH 1/3] Update _read_parallel to use a queue --- sphinx/builders/__init__.py | 53 ++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index 2dd972ecfe0..250cef05a98 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -590,39 +590,60 @@ def _read_serial(self, docnames: list[str]) -> None: self.read_doc(docname) def _read_parallel(self, docnames: list[str], nproc: int) -> None: - chunks = make_chunks(docnames, nproc) + # status iterator is currently not very helpful, it notifies when workers join back. + # in theory there could be a status update queue that runs in the main thread. # create a status_iterator to step progressbar after reading a document # (see: ``merge()`` function) progress = status_iterator( - chunks, - __('reading sources... '), + range(nproc), + __('Worker returned.. '), 'purple', - len(chunks), + nproc, self.config.verbosity, ) - # clear all outdated docs at once + # clear all outdated docs at once so that forked environments are cleared for docname in docnames: self.events.emit('env-purge-doc', self.env, docname) self.env.clear_doc(docname) - def read_process(docs: list[str]) -> bytes: - self.env._app = self._app - for docname in docs: - self.read_doc(docname, _cache=False) - # allow pickling self to send it back - return pickle.dumps(self.env, pickle.HIGHEST_PROTOCOL) + work_queue: multiprocessing.Queue[str] = multiprocessing.Queue() + + # load up all docs to be processed + for doc in docnames: + work_queue.put(doc) - def merge(docs: list[str], otherenv: bytes) -> None: - env = pickle.loads(otherenv) - self.env.merge_info_from(docs, env, self._app) + # Add sentinel values to stop workers after all tasks are done + # Not if there is some edge case where this is not reliable enough + for _ in range(nproc): + work_queue.put(None) + def read_worker(work_queue: multiprocessing.Queue[str]) -> bytes: + self.env._app = self._app + processed_docs = [] + while True: + try: + task = work_queue.get(timeout=1) # timeout to allow graceful exit + except queue.Empty: + break + if task is None: + break + self.read_doc(task, _cache=False) + processed_docs.append(task) + # Return both processed docs and environment + return pickle.dumps((processed_docs, self.env), pickle.HIGHEST_PROTOCOL) + + def merge(work_queue: multiprocessing.Queue[str], results: bytes) -> None: + processed_docs, env = pickle.loads(results) + # Only merge the documents that this worker actually processed + self.env.merge_info_from(processed_docs, env, self._app) next(progress) + # Spawn nproc workers to work through queue tasks = ParallelTasks(nproc) - for chunk in chunks: - tasks.add_task(read_process, chunk, merge) + for _ in range(nproc): + tasks.add_task(read_worker, work_queue, merge) # make sure all threads have finished tasks.join() From e5b886759eea196b0957bbc9e42e417dca6e2b88 Mon Sep 17 00:00:00 2001 From: Ch3ri0ur Date: Sun, 10 Aug 2025 17:55:17 +0200 Subject: [PATCH 2/3] Add the imports back in. --- sphinx/builders/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index 250cef05a98..112afb73cf8 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -3,7 +3,9 @@ from __future__ import annotations import codecs +import multiprocessing import pickle +import queue import re import time from contextlib import nullcontext From ae3ced28b3bc011b9ad290b6071b81fa64b89318 Mon Sep 17 00:00:00 2001 From: Ch3ri0ur Date: Sun, 17 Aug 2025 12:23:40 +0200 Subject: [PATCH 3/3] Fix typing error --- sphinx/builders/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index 112afb73cf8..e58242ed9db 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -610,7 +610,7 @@ def _read_parallel(self, docnames: list[str], nproc: int) -> None: self.events.emit('env-purge-doc', self.env, docname) self.env.clear_doc(docname) - work_queue: multiprocessing.Queue[str] = multiprocessing.Queue() + work_queue: multiprocessing.Queue[str | None] = multiprocessing.Queue() # load up all docs to be processed for doc in docnames: