From 629d1a8c286871e9742528b05769cf3bcbd83347 Mon Sep 17 00:00:00 2001 From: OkktaDan Date: Wed, 24 Sep 2025 09:37:33 +0800 Subject: [PATCH 1/4] feat(heartbeat_timer): add heartbeat timer to send worker info to ingress router --- src/worker_node_ui/timer/heartbeat_timer.py | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/worker_node_ui/timer/heartbeat_timer.py diff --git a/src/worker_node_ui/timer/heartbeat_timer.py b/src/worker_node_ui/timer/heartbeat_timer.py new file mode 100644 index 0000000..5ef3d8b --- /dev/null +++ b/src/worker_node_ui/timer/heartbeat_timer.py @@ -0,0 +1,56 @@ +from PySide6.QtCore import QTimer +import uuid +import httpx + +class HeartbeatTimer: + _instance = None + def __new__(cls, url:str, interval_ms:int = 1000): + if cls._instance is None: + cls._instance = super(HeartbeatTimer, cls).__new__(cls) + return cls._instance + + def __init__(self, url:str, interval_ms:int = 1000): + if hasattr(self, "_init") and self._init: + return + self.websocket_url = url + self.timer = QTimer() + self.timer.timeout.connect(self.send_heartbeat) + self.timer.setInterval(interval_ms) + self._init = True + + def send_heartbeat(self): + # hardcode daw muna sabi ni emil + worker = { + "worker_id": str(uuid.uuid4()), + "master_id": str(uuid.uuid4()), + "cpu": 2, + "memory": 250, + "job_slot": 1, + "status": "STARTED", + "code_runtime": 1.0, + "latency": 0.18, + "job_cache": [] + } + + try: + httpx.post(self.websocket_url, json=worker, timeout=10) + except Exception as e: + raise Exception(f"Error occured when sending heartbeat: {e}") + + def start_timer(self): + try: + if not self.timer.isActive(): + self.timer.start() + else: + raise RuntimeError("Start timer is already running") + except Exception as e: + raise Exception(f"Something went wrong when running the timer {e}") + + def stop_timer(self): + try: + if self.timer.isActive(): + self.timer.stop() + else: + raise RuntimeError("Start timer has already stopped") + except Exception as e: + raise Exception(f"Something went wrong when stopping the timer {e}") \ No newline at end of file From 680936016cd02f271a938723c605ccdcf0b04f2d Mon Sep 17 00:00:00 2001 From: Yisaaaa Date: Thu, 25 Sep 2025 08:38:50 +0800 Subject: [PATCH 2/4] Connecting heartbeat -- in progress --- .gitignore | 1 + main.py | 13 +++++ src/worker_node/config.py | 4 +- src/worker_node/models/master_node.py | 28 +++++----- .../services/websocket_client_service.py | 56 ++++++++++--------- .../providers/heartbeat_timer_provider.py | 6 ++ .../providers/job_executor_provider.py | 9 +++ .../providers/websocket_client_provider.py | 11 ++++ src/worker_node_ui/timer/heartbeat_timer.py | 15 +++-- test/test_job_executor.py | 56 +++++++++++++++---- worker_ws_client_runner.py | 8 +-- 11 files changed, 142 insertions(+), 65 deletions(-) create mode 100644 src/worker_node_ui/providers/heartbeat_timer_provider.py create mode 100644 src/worker_node_ui/providers/job_executor_provider.py create mode 100644 src/worker_node_ui/providers/websocket_client_provider.py diff --git a/.gitignore b/.gitignore index 90a6e7c..681ad31 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ alpine/ shared/ test/path +test/images # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/main.py b/main.py index 07add87..b8ceb32 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,21 @@ import sys from PySide6.QtWidgets import QApplication from src.worker_node_ui.screens.app_controller import AppController +from worker_ws_client_runner import main +import asyncio +from uuid import uuid4 +from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer if __name__ == "__main__": app = QApplication(sys.argv) controller = AppController() sys.exit(app.exec()) + + # asyncio.run(main()) + + # heartbeat_timer = get_heartbeat_timer(worker_id=uuid4(), master_id=uuid4()) + # # await heartbeat_timer.send_heartbeat() + # heartbeat_timer.start_timer() + # print("reachl") + # print("breakpoint") + diff --git a/src/worker_node/config.py b/src/worker_node/config.py index a7160d6..a4826df 100644 --- a/src/worker_node/config.py +++ b/src/worker_node/config.py @@ -26,6 +26,6 @@ if CACHE_SIZE_ALLOCATED <= 0: raise ValueError("CACHE_SIZE_ALLOCATED cannot be zero or lower") +VM_IMG_INSTANCES_PATH=os.getenv("VM_IMG_PATH") CORE_API_URI=os.getenv("CORE_API_URI") - -VM_IMG_INSTANCES_PATH=os.getenv("VM_IMG_PATH") \ No newline at end of file +INGRESS_ROUTER_URI: str = str(os.getenv("INGRESS_ROUTER_URI")) diff --git a/src/worker_node/models/master_node.py b/src/worker_node/models/master_node.py index 54f4da1..904cd21 100644 --- a/src/worker_node/models/master_node.py +++ b/src/worker_node/models/master_node.py @@ -11,24 +11,24 @@ class MasterNode(BaseModel): master_id: UUID4 = Field(...) master_address: str = Field(...) - @field_validator('master_address') - @classmethod - def valid_ip_address(cls, v: str): - v = v.strip() + # @field_validator('master_address') + # @classmethod + # def valid_ip_address(cls, v: str): + # v = v.strip() - if not v: - raise ValueError("master_address cannot be empty") + # if not v: + # raise ValueError("master_address cannot be empty") - try: - ipaddress.ip_address(v) - return v - except ValueError: - ... + # try: + # ipaddress.ip_address(v) + # return v + # except ValueError: + # ... - if domain_pattern.match(v): - return v + # if domain_pattern.match(v): + # return v - raise ValueError("master_address should contain a valid IP Address or domain name") + # raise ValueError("master_address should contain a valid IP Address or domain name") class UpdateMasterNode(BaseModel): master_address: Optional[str] = None \ No newline at end of file diff --git a/src/worker_node/services/websocket_client_service.py b/src/worker_node/services/websocket_client_service.py index 80aeb6e..928111f 100644 --- a/src/worker_node/services/websocket_client_service.py +++ b/src/worker_node/services/websocket_client_service.py @@ -31,6 +31,7 @@ def __init__(self, self.max_reconnect_attempts = max_reconnect_attempts self.current_websocket_url = None self.executor = JobExecutor() + self.master_id = None async def connect( self, @@ -39,9 +40,9 @@ async def connect( websocket_url: Optional[str] = None ) -> None: if not websocket_url: - websocket_url = await self.discover_master_node() + websocket_url, master_id = await self.discover_master_node() - current_address = websocket_url + current_address: str = websocket_url rediscoveries = 0 while rediscoveries <= max_rediscoveries: @@ -50,13 +51,15 @@ async def connect( try: self.websocket = await websockets.connect(current_address) self.current_websocket_url = current_address + self.master_id = master_id return except ConnectionClosed: if attempt == self.max_reconnect_attempts: - new_address = await self.discover_master_node() + new_address, new_master_id = await self.discover_master_node() if new_address != current_address: current_address = new_address + master_id = new_master_id break @@ -152,31 +155,30 @@ def is_connected(self) -> bool: return self.websocket is not None - async def discover_master_node(self) -> str: - # async with httpx.AsyncClient() as client: - # try: - # response = await client.get(f"{CORE_API_URI}/master-node/discover") - # if response.status_code == 404: - # raise MasterNodeNotFound("Master node discovery endpoint returned 404 Not Found") - # elif 500 <= response.status_code < 600: - # raise MasterNodeServerError(f"Master node discovery failed with status {response.status_code}") - # response.raise_for_status() - # master_node_data = response.json() - # try: - # master_node = MasterNode(**master_node_data) - # except Exception as e: - # raise MasterNodeInvalidResponse(f"Invalid master node data: {e}") + async def discover_master_node(self) -> tuple[str, UUID]: + async with httpx.AsyncClient() as client: + try: + response = await client.get(f"{CORE_API_URI}/master-node/discover") + if response.status_code == 404: + raise MasterNodeNotFound("Master node discovery endpoint returned 404 Not Found") + elif 500 <= response.status_code < 600: + raise MasterNodeServerError(f"Master node discovery failed with status {response.status_code}") + response.raise_for_status() + master_node_data = response.json() + try: + master_node = MasterNode(**master_node_data) + except Exception as e: + raise MasterNodeInvalidResponse(f"Invalid master node data: {e}") - # master_address = str(master_node.master_address) - # except httpx.RequestError as e: - # raise MasterNodeDiscoveryError(f"HTTP request failed: {e}") from e + master_address = str(master_node.master_address) + except httpx.RequestError as e: + raise MasterNodeDiscoveryError(f"HTTP request failed: {e}") from e - # websocket_url = f"ws://{master_address}/ws/connect/{self.worker_id}" - # print(f"Discovered master node websocket at: {websocket_url}") - # return websocket_url - - websocket_url = f"ws://0.0.0.0:8020/ws/connect/{self.worker_id}" + websocket_url = f"ws://{master_address}/ws/connect/{self.worker_id}" print(f"Discovered master node websocket at: {websocket_url}") - return websocket_url + return websocket_url, master_node.master_id + + # websocket_url = f"ws://0.0.0.0:8020/ws/connect/{self.worker_id}" + # print(f"Discovered master node websocket at: {websocket_url}") + # return websocket_url -worker_ws_client = WebsocketClientService(UUID("3fa85f64-5717-4562-b3fc-2c963f66afa6")) \ No newline at end of file diff --git a/src/worker_node_ui/providers/heartbeat_timer_provider.py b/src/worker_node_ui/providers/heartbeat_timer_provider.py new file mode 100644 index 0000000..bcccc4e --- /dev/null +++ b/src/worker_node_ui/providers/heartbeat_timer_provider.py @@ -0,0 +1,6 @@ +from ..timer.heartbeat_timer import HeartbeatTimer +from ...worker_node.config import INGRESS_ROUTER_URI +from uuid import UUID + +def get_heartbeat_timer(worker_id: UUID, master_id: UUID) -> HeartbeatTimer: + return HeartbeatTimer(url=INGRESS_ROUTER_URI, worker_id=worker_id, master_id=master_id,) \ No newline at end of file diff --git a/src/worker_node_ui/providers/job_executor_provider.py b/src/worker_node_ui/providers/job_executor_provider.py new file mode 100644 index 0000000..1ee0a46 --- /dev/null +++ b/src/worker_node_ui/providers/job_executor_provider.py @@ -0,0 +1,9 @@ +from ...worker_node.core.job_executor import JobExecutor + + +job_executor: JobExecutor | None = None +def get_job_executor() -> JobExecutor: + global job_executor + if not job_executor: + job_executor = JobExecutor() + return job_executor \ No newline at end of file diff --git a/src/worker_node_ui/providers/websocket_client_provider.py b/src/worker_node_ui/providers/websocket_client_provider.py new file mode 100644 index 0000000..c2d425d --- /dev/null +++ b/src/worker_node_ui/providers/websocket_client_provider.py @@ -0,0 +1,11 @@ +# worker_ws_client = WebsocketClientService(UUID("3fa85f64-5717-4562-b3fc-2c963f66afa6")) + +from ...worker_node.services.websocket_client_service import WebsocketClientService +from uuid import UUID + +websocket_client_service: WebsocketClientService | None = None +def get_websocket_client_service(worker_id: UUID) -> WebsocketClientService: + global websocket_client_service + if not websocket_client_service: + websocket_client_service = WebsocketClientService(worker_id=worker_id) + return websocket_client_service \ No newline at end of file diff --git a/src/worker_node_ui/timer/heartbeat_timer.py b/src/worker_node_ui/timer/heartbeat_timer.py index 5ef3d8b..bfb94ae 100644 --- a/src/worker_node_ui/timer/heartbeat_timer.py +++ b/src/worker_node_ui/timer/heartbeat_timer.py @@ -1,15 +1,15 @@ from PySide6.QtCore import QTimer -import uuid +from uuid import UUID import httpx class HeartbeatTimer: _instance = None - def __new__(cls, url:str, interval_ms:int = 1000): + def __new__(cls, url:str, worker_id: UUID, master_id: UUID, interval_ms:int = 3000): if cls._instance is None: cls._instance = super(HeartbeatTimer, cls).__new__(cls) return cls._instance - def __init__(self, url:str, interval_ms:int = 1000): + def __init__(self, url:str, worker_id: UUID, master_id: UUID, interval_ms:int = 3000, ): if hasattr(self, "_init") and self._init: return self.websocket_url = url @@ -17,12 +17,14 @@ def __init__(self, url:str, interval_ms:int = 1000): self.timer.timeout.connect(self.send_heartbeat) self.timer.setInterval(interval_ms) self._init = True + self.worker_id = str(worker_id) + self.master_id = str(master_id) def send_heartbeat(self): # hardcode daw muna sabi ni emil worker = { - "worker_id": str(uuid.uuid4()), - "master_id": str(uuid.uuid4()), + "worker_id": self.worker_id, + "master_id": self.master_id, "cpu": 2, "memory": 250, "job_slot": 1, @@ -33,7 +35,8 @@ def send_heartbeat(self): } try: - httpx.post(self.websocket_url, json=worker, timeout=10) + httpx.post(f"{self.websocket_url}/heartbeat/", json=worker, timeout=10) + print("sent") except Exception as e: raise Exception(f"Error occured when sending heartbeat: {e}") diff --git a/test/test_job_executor.py b/test/test_job_executor.py index 3209d64..44dc3d6 100644 --- a/test/test_job_executor.py +++ b/test/test_job_executor.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any from unittest.mock import MagicMock, patch -from uuid import uuid4 +from uuid import uuid4, UUID from pydantic import HttpUrl import yaml @@ -15,6 +15,13 @@ from src.worker_node.core.job_executor import JobExecutor, JobConfiguration, JobRequestPayload from src.worker_node.models.payloads import MethodEnum +from src.worker_node_ui.providers.websocket_client_provider import get_websocket_client_service +from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer +# import worker_ws_client_runner +import asyncio +from qasync import QEventLoop +from PySide6.QtWidgets import QApplication + @pytest.fixture @@ -23,11 +30,11 @@ def executor() -> JobExecutor: exec = JobExecutor() return exec -# def get_config(path: Path | str): -# with open(path) as file: -# config_dict = yaml.safe_load(file) -# config = JobConfiguration(**config_dict) -# return config +def get_config(path: Path | str): + with open(path) as file: + config_dict = yaml.safe_load(file) + config = JobConfiguration(**config_dict) + return config @pytest.fixture @@ -166,11 +173,11 @@ def test_run_job_integration(sample_job_request: JobRequestPayload): created_at=datetime.now(), updated_at=datetime.now()) -# executor.local_job_cache.fetch_job_information = lambda job_id: sample_job + executor.local_job_cache.fetch_job_information = lambda job_id: sample_job -# assert sample_job_request.params -# assert sample_job_request.params["qty"] -# assert sample_job_request.body + assert sample_job_request.params + assert sample_job_request.params["qty"] + assert sample_job_request.body result = sample_job_request.body + "\n" for _ in range(int(sample_job_request.params["qty"])): @@ -178,7 +185,9 @@ def test_run_job_integration(sample_job_request: JobRequestPayload): expected: dict[Any, Any] = {"input": sample_job_request.body + "\n", "result": result} -# output = executor.run_job(sample_job_request) + output = executor.run_job(sample_job_request) + + assert expected == output @pytest.mark.integration @@ -203,4 +212,27 @@ def test_run_http_job_integration(sample_http_job_request: JobRequestPayload): assert output.status_code == 200 assert output.body == "healthy" - assert ast.literal_eval(output.body) == expected + # assert ast.literal_eval(output.body) == expected + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_heartbeat(): + app = QApplication.instance() or QApplication([]) + loop = QEventLoop(app) + asyncio.set_event_loop(loop) + worker_id = uuid4() + + # executor = JobExecutor() + # websocket_client_service = get_websocket_client_service(worker_id=worker_id) + # await websocket_client_service.connect(3) + # asyncio.create_task(websocket_client_service.listen_for_messages()) + # heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=UUID(websocket_client_service.worker_id)) + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + # await heartbeat_timer.send_heartbeat() + heartbeat_timer.start_timer() + + while True: + await asyncio.sleep(3) + print("reachl") + print("breakpoint") + \ No newline at end of file diff --git a/worker_ws_client_runner.py b/worker_ws_client_runner.py index f2207be..1948781 100644 --- a/worker_ws_client_runner.py +++ b/worker_ws_client_runner.py @@ -1,10 +1,10 @@ import asyncio import src.worker_node.core.qemu_pool # type: ignore instantiate qemu pools -from src.worker_node.services.websocket_client_service import worker_ws_client +from src.worker_node.services.websocket_client_service import WebsocketClientService -async def main(): +async def main(worker_ws_client: WebsocketClientService): await worker_ws_client.connect(3) await worker_ws_client.listen_for_messages() -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file +# if __name__ == "__main__": +# # asyncio.run(main()) \ No newline at end of file From e188a2d4a5c3713197a47450b6f5d29aa58d7cfb Mon Sep 17 00:00:00 2001 From: OkktaDan Date: Thu, 25 Sep 2025 11:58:43 +0800 Subject: [PATCH 3/4] test(tesst_job_executor): add test for heartbeat send --- test/test_job_executor.py | 43 +++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/test/test_job_executor.py b/test/test_job_executor.py index 44dc3d6..0c79608 100644 --- a/test/test_job_executor.py +++ b/test/test_job_executor.py @@ -214,25 +214,38 @@ def test_run_http_job_integration(sample_http_job_request: JobRequestPayload): # assert ast.literal_eval(output.body) == expected +# -qtbot @pytest.mark.integration @pytest.mark.asyncio -async def test_heartbeat(): - app = QApplication.instance() or QApplication([]) - loop = QEventLoop(app) - asyncio.set_event_loop(loop) +async def test_heartbeat(qtbot): + # app = QApplication.instance() or QApplication([]) + # loop = QEventLoop(app) + # asyncio.set_event_loop(loop) worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + with patch("httpx.post") as mock_post: + heartbeat_timer.start_timer() + qtbot.wait(heartbeat_timer.timer.interval() + 50) + heartbeat_timer.stop_timer() + assert mock_post.called + +# - anyio +# - pytest-asyncio +# - pytest-tornasync +# - pytest-trio +# - pytest-twisted + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_heartbeat_with_async_no_qasync(): + qapp = QApplication.instance() or QApplication([]) - # executor = JobExecutor() - # websocket_client_service = get_websocket_client_service(worker_id=worker_id) - # await websocket_client_service.connect(3) - # asyncio.create_task(websocket_client_service.listen_for_messages()) - # heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=UUID(websocket_client_service.worker_id)) + worker_id = uuid4() heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) - # await heartbeat_timer.send_heartbeat() + heartbeat_timer.start_timer() + qapp.exec() - while True: - await asyncio.sleep(3) - print("reachl") - print("breakpoint") - \ No newline at end of file + # if i sstop process + # heartbeat_timer.stop_timer() + # qapp.quit() From 5963c676ae5c7517385790d2127c33281a88e514 Mon Sep 17 00:00:00 2001 From: OkktaDan Date: Fri, 26 Sep 2025 06:07:43 +0800 Subject: [PATCH 4/4] test(test_heartbeat_timer): add test for heartbeat --- src/worker_node_ui/timer/heartbeat_timer.py | 11 +++-- test/test_heartbeat_timer.py | 45 +++++++++++++++++++++ test/test_job_executor.py | 36 ----------------- 3 files changed, 50 insertions(+), 42 deletions(-) create mode 100644 test/test_heartbeat_timer.py diff --git a/src/worker_node_ui/timer/heartbeat_timer.py b/src/worker_node_ui/timer/heartbeat_timer.py index bfb94ae..a5f2e54 100644 --- a/src/worker_node_ui/timer/heartbeat_timer.py +++ b/src/worker_node_ui/timer/heartbeat_timer.py @@ -38,22 +38,21 @@ def send_heartbeat(self): httpx.post(f"{self.websocket_url}/heartbeat/", json=worker, timeout=10) print("sent") except Exception as e: - raise Exception(f"Error occured when sending heartbeat: {e}") + print(f"Error in sending heartbeat{e}") def start_timer(self): try: if not self.timer.isActive(): self.timer.start() else: - raise RuntimeError("Start timer is already running") + print("heartbeat already running") except Exception as e: - raise Exception(f"Something went wrong when running the timer {e}") - + print(f"an error occured when trying to start heartbeat timer: {e}") def stop_timer(self): try: if self.timer.isActive(): self.timer.stop() else: - raise RuntimeError("Start timer has already stopped") + print("Start timer has already stopped") except Exception as e: - raise Exception(f"Something went wrong when stopping the timer {e}") \ No newline at end of file + print(f"Something went wrong when stopping the timer {e}") \ No newline at end of file diff --git a/test/test_heartbeat_timer.py b/test/test_heartbeat_timer.py new file mode 100644 index 0000000..9c2ca5e --- /dev/null +++ b/test/test_heartbeat_timer.py @@ -0,0 +1,45 @@ +from unittest.mock import patch +from uuid import uuid4 +import pytest +from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer +from PySide6.QtWidgets import QApplication +from PySide6.QtCore import QTimer + +# -pip install pytest-qt +async def test_heartbeat_with_mock(qtbot): + worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + with patch("httpx.post") as mock_post: + heartbeat_timer.start_timer() + qtbot.wait(heartbeat_timer.timer.interval() + 50) + heartbeat_timer.stop_timer() + assert mock_post.called + +# run this in debugger to see start timer does start +@pytest.mark.integration +async def test_heartbeat_integration(): + qapp = QApplication.instance() or QApplication([]) + + worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + + heartbeat_timer.start_timer() + qapp.exec() + + # if i want to stop process do + # heartbeat_timer.stop_timer() + # qapp.quit() + +@pytest.mark.integration +async def test_heartbeat_stop_start(): + qapp = QApplication.instance() or QApplication([]) + + worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + + heartbeat_timer.start_timer() + QTimer.singleShot(2000,lambda: (heartbeat_timer.stop_timer(), qapp.quit())) # suppoesd to quit after 2000ms + qapp.exec() + + + \ No newline at end of file diff --git a/test/test_job_executor.py b/test/test_job_executor.py index 0c79608..a8008af 100644 --- a/test/test_job_executor.py +++ b/test/test_job_executor.py @@ -213,39 +213,3 @@ def test_run_http_job_integration(sample_http_job_request: JobRequestPayload): assert output.body == "healthy" # assert ast.literal_eval(output.body) == expected - -# -qtbot -@pytest.mark.integration -@pytest.mark.asyncio -async def test_heartbeat(qtbot): - # app = QApplication.instance() or QApplication([]) - # loop = QEventLoop(app) - # asyncio.set_event_loop(loop) - worker_id = uuid4() - heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) - with patch("httpx.post") as mock_post: - heartbeat_timer.start_timer() - qtbot.wait(heartbeat_timer.timer.interval() + 50) - heartbeat_timer.stop_timer() - assert mock_post.called - -# - anyio -# - pytest-asyncio -# - pytest-tornasync -# - pytest-trio -# - pytest-twisted - -@pytest.mark.integration -@pytest.mark.asyncio -async def test_heartbeat_with_async_no_qasync(): - qapp = QApplication.instance() or QApplication([]) - - worker_id = uuid4() - heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) - - heartbeat_timer.start_timer() - qapp.exec() - - # if i sstop process - # heartbeat_timer.stop_timer() - # qapp.quit()