diff --git a/.gitignore b/.gitignore index 90a6e7c..681ad31 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ alpine/ shared/ test/path +test/images # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/main.py b/main.py index 07add87..b8ceb32 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,21 @@ import sys from PySide6.QtWidgets import QApplication from src.worker_node_ui.screens.app_controller import AppController +from worker_ws_client_runner import main +import asyncio +from uuid import uuid4 +from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer if __name__ == "__main__": app = QApplication(sys.argv) controller = AppController() sys.exit(app.exec()) + + # asyncio.run(main()) + + # heartbeat_timer = get_heartbeat_timer(worker_id=uuid4(), master_id=uuid4()) + # # await heartbeat_timer.send_heartbeat() + # heartbeat_timer.start_timer() + # print("reachl") + # print("breakpoint") + diff --git a/src/worker_node/config.py b/src/worker_node/config.py index a7160d6..a4826df 100644 --- a/src/worker_node/config.py +++ b/src/worker_node/config.py @@ -26,6 +26,6 @@ if CACHE_SIZE_ALLOCATED <= 0: raise ValueError("CACHE_SIZE_ALLOCATED cannot be zero or lower") +VM_IMG_INSTANCES_PATH=os.getenv("VM_IMG_PATH") CORE_API_URI=os.getenv("CORE_API_URI") - -VM_IMG_INSTANCES_PATH=os.getenv("VM_IMG_PATH") \ No newline at end of file +INGRESS_ROUTER_URI: str = str(os.getenv("INGRESS_ROUTER_URI")) diff --git a/src/worker_node/models/master_node.py b/src/worker_node/models/master_node.py index 54f4da1..904cd21 100644 --- a/src/worker_node/models/master_node.py +++ b/src/worker_node/models/master_node.py @@ -11,24 +11,24 @@ class MasterNode(BaseModel): master_id: UUID4 = Field(...) master_address: str = Field(...) - @field_validator('master_address') - @classmethod - def valid_ip_address(cls, v: str): - v = v.strip() + # @field_validator('master_address') + # @classmethod + # def valid_ip_address(cls, v: str): + # v = v.strip() - if not v: - raise ValueError("master_address cannot be empty") + # if not v: + # raise ValueError("master_address cannot be empty") - try: - ipaddress.ip_address(v) - return v - except ValueError: - ... + # try: + # ipaddress.ip_address(v) + # return v + # except ValueError: + # ... - if domain_pattern.match(v): - return v + # if domain_pattern.match(v): + # return v - raise ValueError("master_address should contain a valid IP Address or domain name") + # raise ValueError("master_address should contain a valid IP Address or domain name") class UpdateMasterNode(BaseModel): master_address: Optional[str] = None \ No newline at end of file diff --git a/src/worker_node/services/websocket_client_service.py b/src/worker_node/services/websocket_client_service.py index 80aeb6e..928111f 100644 --- a/src/worker_node/services/websocket_client_service.py +++ b/src/worker_node/services/websocket_client_service.py @@ -31,6 +31,7 @@ def __init__(self, self.max_reconnect_attempts = max_reconnect_attempts self.current_websocket_url = None self.executor = JobExecutor() + self.master_id = None async def connect( self, @@ -39,9 +40,9 @@ async def connect( websocket_url: Optional[str] = None ) -> None: if not websocket_url: - websocket_url = await self.discover_master_node() + websocket_url, master_id = await self.discover_master_node() - current_address = websocket_url + current_address: str = websocket_url rediscoveries = 0 while rediscoveries <= max_rediscoveries: @@ -50,13 +51,15 @@ async def connect( try: self.websocket = await websockets.connect(current_address) self.current_websocket_url = current_address + self.master_id = master_id return except ConnectionClosed: if attempt == self.max_reconnect_attempts: - new_address = await self.discover_master_node() + new_address, new_master_id = await self.discover_master_node() if new_address != current_address: current_address = new_address + master_id = new_master_id break @@ -152,31 +155,30 @@ def is_connected(self) -> bool: return self.websocket is not None - async def discover_master_node(self) -> str: - # async with httpx.AsyncClient() as client: - # try: - # response = await client.get(f"{CORE_API_URI}/master-node/discover") - # if response.status_code == 404: - # raise MasterNodeNotFound("Master node discovery endpoint returned 404 Not Found") - # elif 500 <= response.status_code < 600: - # raise MasterNodeServerError(f"Master node discovery failed with status {response.status_code}") - # response.raise_for_status() - # master_node_data = response.json() - # try: - # master_node = MasterNode(**master_node_data) - # except Exception as e: - # raise MasterNodeInvalidResponse(f"Invalid master node data: {e}") + async def discover_master_node(self) -> tuple[str, UUID]: + async with httpx.AsyncClient() as client: + try: + response = await client.get(f"{CORE_API_URI}/master-node/discover") + if response.status_code == 404: + raise MasterNodeNotFound("Master node discovery endpoint returned 404 Not Found") + elif 500 <= response.status_code < 600: + raise MasterNodeServerError(f"Master node discovery failed with status {response.status_code}") + response.raise_for_status() + master_node_data = response.json() + try: + master_node = MasterNode(**master_node_data) + except Exception as e: + raise MasterNodeInvalidResponse(f"Invalid master node data: {e}") - # master_address = str(master_node.master_address) - # except httpx.RequestError as e: - # raise MasterNodeDiscoveryError(f"HTTP request failed: {e}") from e + master_address = str(master_node.master_address) + except httpx.RequestError as e: + raise MasterNodeDiscoveryError(f"HTTP request failed: {e}") from e - # websocket_url = f"ws://{master_address}/ws/connect/{self.worker_id}" - # print(f"Discovered master node websocket at: {websocket_url}") - # return websocket_url - - websocket_url = f"ws://0.0.0.0:8020/ws/connect/{self.worker_id}" + websocket_url = f"ws://{master_address}/ws/connect/{self.worker_id}" print(f"Discovered master node websocket at: {websocket_url}") - return websocket_url + return websocket_url, master_node.master_id + + # websocket_url = f"ws://0.0.0.0:8020/ws/connect/{self.worker_id}" + # print(f"Discovered master node websocket at: {websocket_url}") + # return websocket_url -worker_ws_client = WebsocketClientService(UUID("3fa85f64-5717-4562-b3fc-2c963f66afa6")) \ No newline at end of file diff --git a/src/worker_node_ui/providers/heartbeat_timer_provider.py b/src/worker_node_ui/providers/heartbeat_timer_provider.py new file mode 100644 index 0000000..bcccc4e --- /dev/null +++ b/src/worker_node_ui/providers/heartbeat_timer_provider.py @@ -0,0 +1,6 @@ +from ..timer.heartbeat_timer import HeartbeatTimer +from ...worker_node.config import INGRESS_ROUTER_URI +from uuid import UUID + +def get_heartbeat_timer(worker_id: UUID, master_id: UUID) -> HeartbeatTimer: + return HeartbeatTimer(url=INGRESS_ROUTER_URI, worker_id=worker_id, master_id=master_id,) \ No newline at end of file diff --git a/src/worker_node_ui/providers/job_executor_provider.py b/src/worker_node_ui/providers/job_executor_provider.py new file mode 100644 index 0000000..1ee0a46 --- /dev/null +++ b/src/worker_node_ui/providers/job_executor_provider.py @@ -0,0 +1,9 @@ +from ...worker_node.core.job_executor import JobExecutor + + +job_executor: JobExecutor | None = None +def get_job_executor() -> JobExecutor: + global job_executor + if not job_executor: + job_executor = JobExecutor() + return job_executor \ No newline at end of file diff --git a/src/worker_node_ui/providers/websocket_client_provider.py b/src/worker_node_ui/providers/websocket_client_provider.py new file mode 100644 index 0000000..c2d425d --- /dev/null +++ b/src/worker_node_ui/providers/websocket_client_provider.py @@ -0,0 +1,11 @@ +# worker_ws_client = WebsocketClientService(UUID("3fa85f64-5717-4562-b3fc-2c963f66afa6")) + +from ...worker_node.services.websocket_client_service import WebsocketClientService +from uuid import UUID + +websocket_client_service: WebsocketClientService | None = None +def get_websocket_client_service(worker_id: UUID) -> WebsocketClientService: + global websocket_client_service + if not websocket_client_service: + websocket_client_service = WebsocketClientService(worker_id=worker_id) + return websocket_client_service \ No newline at end of file diff --git a/src/worker_node_ui/timer/heartbeat_timer.py b/src/worker_node_ui/timer/heartbeat_timer.py new file mode 100644 index 0000000..a5f2e54 --- /dev/null +++ b/src/worker_node_ui/timer/heartbeat_timer.py @@ -0,0 +1,58 @@ +from PySide6.QtCore import QTimer +from uuid import UUID +import httpx + +class HeartbeatTimer: + _instance = None + def __new__(cls, url:str, worker_id: UUID, master_id: UUID, interval_ms:int = 3000): + if cls._instance is None: + cls._instance = super(HeartbeatTimer, cls).__new__(cls) + return cls._instance + + def __init__(self, url:str, worker_id: UUID, master_id: UUID, interval_ms:int = 3000, ): + if hasattr(self, "_init") and self._init: + return + self.websocket_url = url + self.timer = QTimer() + self.timer.timeout.connect(self.send_heartbeat) + self.timer.setInterval(interval_ms) + self._init = True + self.worker_id = str(worker_id) + self.master_id = str(master_id) + + def send_heartbeat(self): + # hardcode daw muna sabi ni emil + worker = { + "worker_id": self.worker_id, + "master_id": self.master_id, + "cpu": 2, + "memory": 250, + "job_slot": 1, + "status": "STARTED", + "code_runtime": 1.0, + "latency": 0.18, + "job_cache": [] + } + + try: + httpx.post(f"{self.websocket_url}/heartbeat/", json=worker, timeout=10) + print("sent") + except Exception as e: + print(f"Error in sending heartbeat{e}") + + def start_timer(self): + try: + if not self.timer.isActive(): + self.timer.start() + else: + print("heartbeat already running") + except Exception as e: + print(f"an error occured when trying to start heartbeat timer: {e}") + def stop_timer(self): + try: + if self.timer.isActive(): + self.timer.stop() + else: + print("Start timer has already stopped") + except Exception as e: + print(f"Something went wrong when stopping the timer {e}") \ No newline at end of file diff --git a/test/test_heartbeat_timer.py b/test/test_heartbeat_timer.py new file mode 100644 index 0000000..9c2ca5e --- /dev/null +++ b/test/test_heartbeat_timer.py @@ -0,0 +1,45 @@ +from unittest.mock import patch +from uuid import uuid4 +import pytest +from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer +from PySide6.QtWidgets import QApplication +from PySide6.QtCore import QTimer + +# -pip install pytest-qt +async def test_heartbeat_with_mock(qtbot): + worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + with patch("httpx.post") as mock_post: + heartbeat_timer.start_timer() + qtbot.wait(heartbeat_timer.timer.interval() + 50) + heartbeat_timer.stop_timer() + assert mock_post.called + +# run this in debugger to see start timer does start +@pytest.mark.integration +async def test_heartbeat_integration(): + qapp = QApplication.instance() or QApplication([]) + + worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + + heartbeat_timer.start_timer() + qapp.exec() + + # if i want to stop process do + # heartbeat_timer.stop_timer() + # qapp.quit() + +@pytest.mark.integration +async def test_heartbeat_stop_start(): + qapp = QApplication.instance() or QApplication([]) + + worker_id = uuid4() + heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4()) + + heartbeat_timer.start_timer() + QTimer.singleShot(2000,lambda: (heartbeat_timer.stop_timer(), qapp.quit())) # suppoesd to quit after 2000ms + qapp.exec() + + + \ No newline at end of file diff --git a/test/test_job_executor.py b/test/test_job_executor.py index ab72d71..0ca36d0 100644 --- a/test/test_job_executor.py +++ b/test/test_job_executor.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any from unittest.mock import MagicMock, patch -from uuid import uuid4 +from uuid import uuid4, UUID from pydantic import HttpUrl import yaml @@ -15,6 +15,13 @@ from src.worker_node.core.job_executor import JobExecutor, JobConfiguration, JobRequestPayload from src.worker_node.models.payloads import MethodEnum +from src.worker_node_ui.providers.websocket_client_provider import get_websocket_client_service +from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer +# import worker_ws_client_runner +import asyncio +from qasync import QEventLoop +from PySide6.QtWidgets import QApplication + @pytest.fixture @@ -24,10 +31,10 @@ def executor() -> JobExecutor: return exec def get_config(path: Path | str): - with open(path) as file: - config_dict = yaml.safe_load(file) - config = JobConfiguration(**config_dict) - return config + with open(path) as file: + config_dict = yaml.safe_load(file) + config = JobConfiguration(**config_dict) + return config @pytest.fixture @@ -182,6 +189,9 @@ def test_run_job_integration(sample_job_request: JobRequestPayload): output = executor.run_job(sample_job_request) + assert expected == output + + """ @pytest.mark.integration def test_run_http_job_integration(sample_http_job_request: JobRequestPayload): @@ -206,4 +216,5 @@ def test_run_http_job_integration(sample_http_job_request: JobRequestPayload): assert output.body == "healthy" assert ast.literal_eval(output.body) == expected - """ \ No newline at end of file + """ + diff --git a/worker_ws_client_runner.py b/worker_ws_client_runner.py index f2207be..1948781 100644 --- a/worker_ws_client_runner.py +++ b/worker_ws_client_runner.py @@ -1,10 +1,10 @@ import asyncio import src.worker_node.core.qemu_pool # type: ignore instantiate qemu pools -from src.worker_node.services.websocket_client_service import worker_ws_client +from src.worker_node.services.websocket_client_service import WebsocketClientService -async def main(): +async def main(worker_ws_client: WebsocketClientService): await worker_ws_client.connect(3) await worker_ws_client.listen_for_messages() -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file +# if __name__ == "__main__": +# # asyncio.run(main()) \ No newline at end of file