Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ alpine/
shared/

test/path
test/images

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
13 changes: 13 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import sys
from PySide6.QtWidgets import QApplication
from src.worker_node_ui.screens.app_controller import AppController
from worker_ws_client_runner import main
import asyncio
from uuid import uuid4
from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer

if __name__ == "__main__":
app = QApplication(sys.argv)
controller = AppController()
sys.exit(app.exec())

# asyncio.run(main())

# heartbeat_timer = get_heartbeat_timer(worker_id=uuid4(), master_id=uuid4())
# # await heartbeat_timer.send_heartbeat()
# heartbeat_timer.start_timer()
# print("reachl")
# print("breakpoint")

4 changes: 2 additions & 2 deletions src/worker_node/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@
if CACHE_SIZE_ALLOCATED <= 0:
raise ValueError("CACHE_SIZE_ALLOCATED cannot be zero or lower")

VM_IMG_INSTANCES_PATH=os.getenv("VM_IMG_PATH")
CORE_API_URI=os.getenv("CORE_API_URI")

VM_IMG_INSTANCES_PATH=os.getenv("VM_IMG_PATH")
INGRESS_ROUTER_URI: str = str(os.getenv("INGRESS_ROUTER_URI"))
28 changes: 14 additions & 14 deletions src/worker_node/models/master_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ class MasterNode(BaseModel):
master_id: UUID4 = Field(...)
master_address: str = Field(...)

@field_validator('master_address')
@classmethod
def valid_ip_address(cls, v: str):
v = v.strip()
# @field_validator('master_address')
# @classmethod
# def valid_ip_address(cls, v: str):
# v = v.strip()

if not v:
raise ValueError("master_address cannot be empty")
# if not v:
# raise ValueError("master_address cannot be empty")

try:
ipaddress.ip_address(v)
return v
except ValueError:
...
# try:
# ipaddress.ip_address(v)
# return v
# except ValueError:
# ...

if domain_pattern.match(v):
return v
# if domain_pattern.match(v):
# return v

raise ValueError("master_address should contain a valid IP Address or domain name")
# raise ValueError("master_address should contain a valid IP Address or domain name")

class UpdateMasterNode(BaseModel):
master_address: Optional[str] = None
56 changes: 29 additions & 27 deletions src/worker_node/services/websocket_client_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self,
self.max_reconnect_attempts = max_reconnect_attempts
self.current_websocket_url = None
self.executor = JobExecutor()
self.master_id = None

async def connect(
self,
Expand All @@ -39,9 +40,9 @@ async def connect(
websocket_url: Optional[str] = None
) -> None:
if not websocket_url:
websocket_url = await self.discover_master_node()
websocket_url, master_id = await self.discover_master_node()

current_address = websocket_url
current_address: str = websocket_url
rediscoveries = 0

while rediscoveries <= max_rediscoveries:
Expand All @@ -50,13 +51,15 @@ async def connect(
try:
self.websocket = await websockets.connect(current_address)
self.current_websocket_url = current_address
self.master_id = master_id
return

except ConnectionClosed:
if attempt == self.max_reconnect_attempts:
new_address = await self.discover_master_node()
new_address, new_master_id = await self.discover_master_node()
if new_address != current_address:
current_address = new_address
master_id = new_master_id

break

Expand Down Expand Up @@ -152,31 +155,30 @@ def is_connected(self) -> bool:

return self.websocket is not None

async def discover_master_node(self) -> str:
# async with httpx.AsyncClient() as client:
# try:
# response = await client.get(f"{CORE_API_URI}/master-node/discover")
# if response.status_code == 404:
# raise MasterNodeNotFound("Master node discovery endpoint returned 404 Not Found")
# elif 500 <= response.status_code < 600:
# raise MasterNodeServerError(f"Master node discovery failed with status {response.status_code}")
# response.raise_for_status()
# master_node_data = response.json()
# try:
# master_node = MasterNode(**master_node_data)
# except Exception as e:
# raise MasterNodeInvalidResponse(f"Invalid master node data: {e}")
async def discover_master_node(self) -> tuple[str, UUID]:
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{CORE_API_URI}/master-node/discover")
if response.status_code == 404:
raise MasterNodeNotFound("Master node discovery endpoint returned 404 Not Found")
elif 500 <= response.status_code < 600:
raise MasterNodeServerError(f"Master node discovery failed with status {response.status_code}")
response.raise_for_status()
master_node_data = response.json()
try:
master_node = MasterNode(**master_node_data)
except Exception as e:
raise MasterNodeInvalidResponse(f"Invalid master node data: {e}")

# master_address = str(master_node.master_address)
# except httpx.RequestError as e:
# raise MasterNodeDiscoveryError(f"HTTP request failed: {e}") from e
master_address = str(master_node.master_address)
except httpx.RequestError as e:
raise MasterNodeDiscoveryError(f"HTTP request failed: {e}") from e

# websocket_url = f"ws://{master_address}/ws/connect/{self.worker_id}"
# print(f"Discovered master node websocket at: {websocket_url}")
# return websocket_url

websocket_url = f"ws://0.0.0.0:8020/ws/connect/{self.worker_id}"
websocket_url = f"ws://{master_address}/ws/connect/{self.worker_id}"
print(f"Discovered master node websocket at: {websocket_url}")
return websocket_url
return websocket_url, master_node.master_id

# websocket_url = f"ws://0.0.0.0:8020/ws/connect/{self.worker_id}"
# print(f"Discovered master node websocket at: {websocket_url}")
# return websocket_url

worker_ws_client = WebsocketClientService(UUID("3fa85f64-5717-4562-b3fc-2c963f66afa6"))
6 changes: 6 additions & 0 deletions src/worker_node_ui/providers/heartbeat_timer_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ..timer.heartbeat_timer import HeartbeatTimer
from ...worker_node.config import INGRESS_ROUTER_URI
from uuid import UUID

def get_heartbeat_timer(worker_id: UUID, master_id: UUID) -> HeartbeatTimer:
return HeartbeatTimer(url=INGRESS_ROUTER_URI, worker_id=worker_id, master_id=master_id,)
9 changes: 9 additions & 0 deletions src/worker_node_ui/providers/job_executor_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ...worker_node.core.job_executor import JobExecutor


job_executor: JobExecutor | None = None
def get_job_executor() -> JobExecutor:
global job_executor
if not job_executor:
job_executor = JobExecutor()
return job_executor
11 changes: 11 additions & 0 deletions src/worker_node_ui/providers/websocket_client_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# worker_ws_client = WebsocketClientService(UUID("3fa85f64-5717-4562-b3fc-2c963f66afa6"))

from ...worker_node.services.websocket_client_service import WebsocketClientService
from uuid import UUID

websocket_client_service: WebsocketClientService | None = None
def get_websocket_client_service(worker_id: UUID) -> WebsocketClientService:
global websocket_client_service
if not websocket_client_service:
websocket_client_service = WebsocketClientService(worker_id=worker_id)
return websocket_client_service
58 changes: 58 additions & 0 deletions src/worker_node_ui/timer/heartbeat_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from PySide6.QtCore import QTimer
from uuid import UUID
import httpx

class HeartbeatTimer:
_instance = None
def __new__(cls, url:str, worker_id: UUID, master_id: UUID, interval_ms:int = 3000):
if cls._instance is None:
cls._instance = super(HeartbeatTimer, cls).__new__(cls)
return cls._instance

def __init__(self, url:str, worker_id: UUID, master_id: UUID, interval_ms:int = 3000, ):
if hasattr(self, "_init") and self._init:
return
self.websocket_url = url
self.timer = QTimer()
self.timer.timeout.connect(self.send_heartbeat)
self.timer.setInterval(interval_ms)
self._init = True
self.worker_id = str(worker_id)
self.master_id = str(master_id)

def send_heartbeat(self):
# hardcode daw muna sabi ni emil
worker = {
"worker_id": self.worker_id,
"master_id": self.master_id,
"cpu": 2,
"memory": 250,
"job_slot": 1,
"status": "STARTED",
"code_runtime": 1.0,
"latency": 0.18,
"job_cache": []
}

try:
httpx.post(f"{self.websocket_url}/heartbeat/", json=worker, timeout=10)
print("sent")
except Exception as e:
print(f"Error in sending heartbeat{e}")

def start_timer(self):
try:
if not self.timer.isActive():
self.timer.start()
else:
print("heartbeat already running")
except Exception as e:
print(f"an error occured when trying to start heartbeat timer: {e}")
def stop_timer(self):
try:
if self.timer.isActive():
self.timer.stop()
else:
print("Start timer has already stopped")
except Exception as e:
print(f"Something went wrong when stopping the timer {e}")
45 changes: 45 additions & 0 deletions test/test_heartbeat_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from unittest.mock import patch
from uuid import uuid4
import pytest
from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer
from PySide6.QtWidgets import QApplication
from PySide6.QtCore import QTimer

# -pip install pytest-qt
async def test_heartbeat_with_mock(qtbot):
worker_id = uuid4()
heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4())
with patch("httpx.post") as mock_post:
heartbeat_timer.start_timer()
qtbot.wait(heartbeat_timer.timer.interval() + 50)
heartbeat_timer.stop_timer()
assert mock_post.called

# run this in debugger to see start timer does start
@pytest.mark.integration
async def test_heartbeat_integration():
qapp = QApplication.instance() or QApplication([])

worker_id = uuid4()
heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4())

heartbeat_timer.start_timer()
qapp.exec()

# if i want to stop process do
# heartbeat_timer.stop_timer()
# qapp.quit()

@pytest.mark.integration
async def test_heartbeat_stop_start():
qapp = QApplication.instance() or QApplication([])

worker_id = uuid4()
heartbeat_timer = get_heartbeat_timer(worker_id=worker_id, master_id=uuid4())

heartbeat_timer.start_timer()
QTimer.singleShot(2000,lambda: (heartbeat_timer.stop_timer(), qapp.quit())) # suppoesd to quit after 2000ms
qapp.exec()



23 changes: 17 additions & 6 deletions test/test_job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
from uuid import uuid4
from uuid import uuid4, UUID

from pydantic import HttpUrl
import yaml
Expand All @@ -15,6 +15,13 @@
from src.worker_node.core.job_executor import JobExecutor, JobConfiguration, JobRequestPayload
from src.worker_node.models.payloads import MethodEnum

from src.worker_node_ui.providers.websocket_client_provider import get_websocket_client_service
from src.worker_node_ui.providers.heartbeat_timer_provider import get_heartbeat_timer
# import worker_ws_client_runner
import asyncio
from qasync import QEventLoop
from PySide6.QtWidgets import QApplication



@pytest.fixture
Expand All @@ -24,10 +31,10 @@ def executor() -> JobExecutor:
return exec

def get_config(path: Path | str):
with open(path) as file:
config_dict = yaml.safe_load(file)
config = JobConfiguration(**config_dict)
return config
with open(path) as file:
config_dict = yaml.safe_load(file)
config = JobConfiguration(**config_dict)
return config


@pytest.fixture
Expand Down Expand Up @@ -182,6 +189,9 @@ def test_run_job_integration(sample_job_request: JobRequestPayload):

output = executor.run_job(sample_job_request)

assert expected == output



""" @pytest.mark.integration
def test_run_http_job_integration(sample_http_job_request: JobRequestPayload):
Expand All @@ -206,4 +216,5 @@ def test_run_http_job_integration(sample_http_job_request: JobRequestPayload):
assert output.body == "healthy"

assert ast.literal_eval(output.body) == expected
"""
"""

8 changes: 4 additions & 4 deletions worker_ws_client_runner.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import src.worker_node.core.qemu_pool # type: ignore instantiate qemu pools
from src.worker_node.services.websocket_client_service import worker_ws_client
from src.worker_node.services.websocket_client_service import WebsocketClientService

async def main():
async def main(worker_ws_client: WebsocketClientService):
await worker_ws_client.connect(3)
await worker_ws_client.listen_for_messages()

if __name__ == "__main__":
asyncio.run(main())
# if __name__ == "__main__":
# # asyncio.run(main())