From 17cc5e7eb4534531b33d688fa73ac6c77e40f1a3 Mon Sep 17 00:00:00 2001 From: nitrobass24 Date: Mon, 23 Mar 2026 20:28:53 -0500 Subject: [PATCH 1/2] Fix pre-existing code quality bugs from Ruff review (#289) - Fix busy-wait CPU spike on shutdown in AppProcess.terminate() - Fix race condition on queue internals in ExtractDispatch - Fix user=None producing "None@host" in Sshcp with _remote_address helper - Fix wrong boolean operator in test_dispatch wait loop (and -> or) - Fix unclosed urlopen response in WebhookNotifier - Fix leaked Popen/fnull in integration test setup (use subprocess.run) Co-Authored-By: Claude Opus 4.6 --- src/python/common/app_process.py | 3 ++- src/python/controller/extract/dispatch.py | 23 ++++++++++++------- src/python/controller/notifier.py | 3 ++- src/python/ssh/sshcp.py | 12 +++++++--- .../test_controller/test_controller.py | 3 +-- .../test_extract/test_extract.py | 15 +++++++----- .../test_extract/test_dispatch.py | 2 +- .../test_ssh/test_sshcp_remote_address.py | 19 +++++++++++++++ 8 files changed, 58 insertions(+), 22 deletions(-) create mode 100644 src/python/tests/unittests/test_ssh/test_sshcp_remote_address.py diff --git a/src/python/common/app_process.py b/src/python/common/app_process.py index 49f001bc..71e0cf7a 100644 --- a/src/python/common/app_process.py +++ b/src/python/common/app_process.py @@ -5,6 +5,7 @@ import signal import sys import threading +import time from abc import abstractmethod from datetime import datetime from multiprocessing import Event, Process, Queue @@ -114,7 +115,7 @@ def elapsed_ms(start): timestamp_start = datetime.now() while self.is_alive() and elapsed_ms(timestamp_start) < AppProcess.__DEFAULT_TERMINATE_TIMEOUT_MS: - pass + time.sleep(0.05) super().terminate() diff --git a/src/python/controller/extract/dispatch.py b/src/python/controller/extract/dispatch.py index 1d36907c..5cfb7cd6 100644 --- a/src/python/controller/extract/dispatch.py +++ b/src/python/controller/extract/dispatch.py @@ -102,7 +102,8 @@ def add_listener(self, listener: ExtractListener): self.__listeners_lock.release() def status(self) -> list[ExtractStatus]: - tasks = list(self.__task_queue.queue) + with self.__task_queue.mutex: + tasks = list(self.__task_queue.queue) statuses = [] for task in tasks: status = ExtractStatus( @@ -130,10 +131,11 @@ def extract(self, req: ExtractRequest): model_file = req.model_file self.logger.debug("Received extract for {}".format(model_file.name)) - for task in self.__task_queue.queue: - if task.root_name == model_file.name and task.pair_id == req.pair_id: - self.logger.info("Ignoring extract for {}, already exists".format(model_file.name)) - return + with self.__task_queue.mutex: + for task in self.__task_queue.queue: + if task.root_name == model_file.name and task.pair_id == req.pair_id: + self.logger.info("Ignoring extract for {}, already exists".format(model_file.name)) + return # noinspection PyProtectedMember task = ExtractDispatch._Task(model_file.name, model_file.is_dir, req.pair_id) @@ -183,9 +185,11 @@ def __worker(self): while not self.__worker_shutdown.is_set(): # Try to grab next task # Do another check for shutdown - while len(self.__task_queue.queue) > 0 and not self.__worker_shutdown.is_set(): - # peek the task - task = self.__task_queue.queue[0] + with self.__task_queue.mutex: + has_tasks = len(self.__task_queue.queue) > 0 + while has_tasks and not self.__worker_shutdown.is_set(): + with self.__task_queue.mutex: + task = self.__task_queue.queue[0] # We have a task, extract archives one by one completed = True @@ -217,6 +221,9 @@ def __worker(self): listener.extract_failed(task.root_name, task.root_is_dir, task.pair_id) self.__listeners_lock.release() + with self.__task_queue.mutex: + has_tasks = len(self.__task_queue.queue) > 0 + time.sleep(ExtractDispatch.__WORKER_SLEEP_INTERVAL_IN_SECS) self.logger.debug("Stopped worker thread") diff --git a/src/python/controller/notifier.py b/src/python/controller/notifier.py index 555c4dda..3c5933fd 100644 --- a/src/python/controller/notifier.py +++ b/src/python/controller/notifier.py @@ -112,7 +112,8 @@ def _send_post(self, url: str, payload: dict): try: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") - urllib.request.urlopen(req, timeout=5) + with urllib.request.urlopen(req, timeout=5): + pass self._logger.debug("Webhook sent: %s %s", payload["event_type"], payload["filename"]) except Exception as e: self._logger.warning("Webhook failed: %s", str(e)) diff --git a/src/python/ssh/sshcp.py b/src/python/ssh/sshcp.py index 94f20ba7..236ac329 100644 --- a/src/python/ssh/sshcp.py +++ b/src/python/ssh/sshcp.py @@ -39,6 +39,12 @@ def __init__(self, host: str, port: int, user: str | None = None, password: str self.__shell_detected: bool = False self.logger = logging.getLogger(self.__class__.__name__) + def _remote_address(self) -> str: + """Return 'user@host' when user is set, or just 'host' when None.""" + if self.__user is not None: + return "{}@{}".format(self.__user, self.__host) + return self.__host + def set_base_logger(self, base_logger: logging.Logger): self.logger = base_logger.getChild(self.__class__.__name__) @@ -126,7 +132,7 @@ def _run_shell_command(self, command: str) -> bytes: "-p", str(self.__port), ] - args = ["{}@{}".format(self.__user, self.__host), quoted] + args = [self._remote_address(), quoted] return self.__run_command(command="ssh", flags=" ".join(flags), args=" ".join(args)) def _check_remote_shells_via_sftp(self) -> list[str]: @@ -162,7 +168,7 @@ def _sftp_stat(self, remote_path: str): args = [ "-b", "-", # read commands from stdin - "{}@{}".format(self.__user, self.__host), + self._remote_address(), ] command_args = ["sftp"] @@ -411,7 +417,7 @@ def shell(self, command: str) -> bytes: "-p", str(self.__port), # port ] - args = ["{}@{}".format(self.__user, self.__host), command] + args = [self._remote_address(), command] return self.__run_command(command="ssh", flags=" ".join(flags), args=" ".join(args)) def copy(self, local_path: str, remote_path: str): diff --git a/src/python/tests/integration/test_controller/test_controller.py b/src/python/tests/integration/test_controller/test_controller.py index cc8f9c5b..02be45d0 100644 --- a/src/python/tests/integration/test_controller/test_controller.py +++ b/src/python/tests/integration/test_controller/test_controller.py @@ -83,8 +83,7 @@ def create_archive(*args): zf.write(temp_file_path, os.path.basename(temp_file_path)) zf.close() elif ext == "rar": - fnull = open(os.devnull, "w") - subprocess.Popen(["rar", "a", "-ep", path, temp_file_path], stdout=fnull).communicate() + subprocess.run(["rar", "a", "-ep", path, temp_file_path], stdout=subprocess.DEVNULL, check=True) else: raise ValueError("Unsupported archive format: {}".format(os.path.basename(path))) return os.path.getsize(path) diff --git a/src/python/tests/integration/test_controller/test_extract/test_extract.py b/src/python/tests/integration/test_controller/test_extract/test_extract.py index 46193859..375944ce 100644 --- a/src/python/tests/integration/test_controller/test_extract/test_extract.py +++ b/src/python/tests/integration/test_controller/test_extract/test_extract.py @@ -46,21 +46,24 @@ def setUpClass(cls): zf.close() # rar - fnull = open(os.devnull, "w") TestExtract.ar_rar = os.path.join(archive_dir, "file.rar") - subprocess.Popen(["rar", "a", "-ep", TestExtract.ar_rar, temp_file], stdout=fnull) + subprocess.run(["rar", "a", "-ep", TestExtract.ar_rar, temp_file], stdout=subprocess.DEVNULL, check=True) # rar split - subprocess.Popen( - ["rar", "a", "-ep", "-m0", "-v50k", os.path.join(archive_dir, "file.split.rar"), temp_file], stdout=fnull + subprocess.run( + ["rar", "a", "-ep", "-m0", "-v50k", os.path.join(archive_dir, "file.split.rar"), temp_file], + stdout=subprocess.DEVNULL, + check=True, ) TestExtract.ar_rar_split_p1 = os.path.join(archive_dir, "file.split.part1.rar") TestExtract.ar_rar_split_p2 = os.path.join(archive_dir, "file.split.part2.rar") # tar.gz TestExtract.ar_tar_gz = os.path.join(archive_dir, "file.tar.gz") - subprocess.Popen( - ["tar", "czvf", TestExtract.ar_tar_gz, "-C", os.path.dirname(temp_file), os.path.basename(temp_file)] + subprocess.run( + ["tar", "czvf", TestExtract.ar_tar_gz, "-C", os.path.dirname(temp_file), os.path.basename(temp_file)], + stdout=subprocess.DEVNULL, + check=True, ) @classmethod diff --git a/src/python/tests/unittests/test_controller/test_extract/test_dispatch.py b/src/python/tests/unittests/test_controller/test_extract/test_dispatch.py index fa287543..3f5280eb 100644 --- a/src/python/tests/unittests/test_controller/test_extract/test_dispatch.py +++ b/src/python/tests/unittests/test_controller/test_extract/test_dispatch.py @@ -701,7 +701,7 @@ def _extract_archive(**kwargs): time.sleep(0.1) - while self.mock_extract_archive.call_count < 1 and self.listener.extract_completed.call_count < 1: + while self.mock_extract_archive.call_count < 1 or self.listener.extract_completed.call_count < 1: pass time.sleep(0.1) self.listener.extract_completed.assert_called_once_with("a", False, None) diff --git a/src/python/tests/unittests/test_ssh/test_sshcp_remote_address.py b/src/python/tests/unittests/test_ssh/test_sshcp_remote_address.py new file mode 100644 index 00000000..81a2450c --- /dev/null +++ b/src/python/tests/unittests/test_ssh/test_sshcp_remote_address.py @@ -0,0 +1,19 @@ +import unittest + +from ssh import Sshcp + + +class TestSshcpRemoteAddress(unittest.TestCase): + """Unit tests for Sshcp._remote_address helper.""" + + def test_remote_address_with_user(self): + sshcp = Sshcp(host="example.com", port=22, user="alice") + self.assertEqual("alice@example.com", sshcp._remote_address()) + + def test_remote_address_without_user(self): + sshcp = Sshcp(host="example.com", port=22, user=None) + self.assertEqual("example.com", sshcp._remote_address()) + + def test_remote_address_default_user(self): + sshcp = Sshcp(host="example.com", port=22) + self.assertEqual("example.com", sshcp._remote_address()) From 38d205254dd3d61f2bf2672c4e87bc7db5aa016b Mon Sep 17 00:00:00 2001 From: nitrobass24 Date: Mon, 23 Mar 2026 20:38:39 -0500 Subject: [PATCH 2/2] Fix missed user@host format in Sshcp.copy() to use _remote_address() The copy() method still had a direct "{}@{}".format(user, host) that would produce "None@host" when user is None. Co-Authored-By: Claude Opus 4.6 --- src/python/ssh/sshcp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/ssh/sshcp.py b/src/python/ssh/sshcp.py index 236ac329..1352e964 100644 --- a/src/python/ssh/sshcp.py +++ b/src/python/ssh/sshcp.py @@ -437,5 +437,5 @@ def copy(self, local_path: str, remote_path: str): "-P", str(self.__port), # port ] - args = [local_path, "{}@{}:{}".format(self.__user, self.__host, remote_path)] + args = [local_path, "{}:{}".format(self._remote_address(), remote_path)] self.__run_command(command="scp", flags=" ".join(flags), args=" ".join(args))