diff --git a/packages/aws-library/src/aws_library/ec2/__init__.py b/packages/aws-library/src/aws_library/ec2/__init__.py index 0acff01ff0d6..127a6dd076db 100644 --- a/packages/aws-library/src/aws_library/ec2/__init__.py +++ b/packages/aws-library/src/aws_library/ec2/__init__.py @@ -17,6 +17,7 @@ EC2InstanceData, EC2InstanceType, EC2Tags, + GenericResourceValueType, Resources, ) @@ -36,6 +37,7 @@ "EC2NotConnectedError", "EC2RuntimeError", "EC2Tags", + "GenericResourceValueType", "Resources", "SimcoreEC2API", ) diff --git a/packages/aws-library/src/aws_library/ec2/_models.py b/packages/aws-library/src/aws_library/ec2/_models.py index e08e207b0b0e..3e5a2b00691d 100644 --- a/packages/aws-library/src/aws_library/ec2/_models.py +++ b/packages/aws-library/src/aws_library/ec2/_models.py @@ -1,3 +1,4 @@ +import contextlib import datetime import re import tempfile @@ -14,45 +15,171 @@ Field, NonNegativeFloat, NonNegativeInt, + StrictFloat, + StrictInt, StringConstraints, + TypeAdapter, + ValidationError, field_validator, ) from pydantic.config import JsonDict from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType +GenericResourceValueType: TypeAlias = StrictInt | StrictFloat | str + class Resources(BaseModel, frozen=True): cpus: NonNegativeFloat ram: ByteSize + generic_resources: Annotated[ + dict[str, GenericResourceValueType], + Field( + default_factory=dict, + description=( + "Arbitrary additional resources (e.g. {'threads': 8}). " + "Numeric values are treated as quantities and participate in add/sub/compare." + ), + ), + ] = DEFAULT_FACTORY @classmethod def create_as_empty(cls) -> "Resources": return cls(cpus=0, ram=ByteSize(0)) def __ge__(self, other: "Resources") -> bool: - return self.cpus >= other.cpus and self.ram >= other.ram + """operator for >= comparison + if self has greater or equal resources than other, returns True + This will return True only if all of the resources in self are greater or equal to other + + Note that generic_resources are compared only if they are numeric + Non-numeric generic resources must be equal in both or only defined in self + to be considered greater or equal + """ + if self == other: + return True + return self > other def __gt__(self, other: "Resources") -> bool: - return self.cpus > other.cpus or self.ram > other.ram + """operator for > comparison + if self has resources greater than other, returns True + This will return True only if all of the resources in self are greater than other + + Note that generic_resources are compared only if they are numeric + Non-numeric generic resources must only be defined in self + to be considered greater + """ + if (self.cpus < other.cpus) or (self.ram < other.ram): + return False + + keys = set(self.generic_resources) | set(other.generic_resources) + for k in keys: + a = self.generic_resources.get(k) + b = other.generic_resources.get(k) + if a is None: + return False + if b is None: + # a is greater as b is not defined + continue + if isinstance(a, int | float) and isinstance(b, int | float): + if a < b: + return False + else: + # remaining options is a is str and b is str or mixed types + # NOTE: we cannot compare strings unless they are equal or some kind of boolean (e.g. "true", "false", "yes", "no", "1", "0") + assert isinstance(a, str) # nosec + assert isinstance(b, int | float | str) # nosec + # let's try to get a boolean out of the values to compare them + with contextlib.suppress(ValidationError): + a_as_boolean = TypeAdapter(bool).validate_python(a) + b_as_boolean = TypeAdapter(bool).validate_python(b) + if not a_as_boolean and b_as_boolean: + return False + + # here we have either everything greater or equal or non-comparable strings + + return self != other def __add__(self, other: "Resources") -> "Resources": + """operator for adding two Resources + Note that only numeric generic resources are added + Non-numeric generic resources are ignored + """ + merged: dict[str, GenericResourceValueType] = {} + keys = set(self.generic_resources) | set(other.generic_resources) + for k in keys: + a = self.generic_resources.get(k) + b = other.generic_resources.get(k) + # adding non numeric values does not make sense, so we skip those for the resulting resource + if isinstance(a, int | float) and isinstance(b, int | float): + merged[k] = a + b + elif a is None and isinstance(b, int | float): + merged[k] = b + elif b is None and isinstance(a, int | float): + merged[k] = a + return Resources.model_construct( - **{ - key: a + b - for (key, a), b in zip( - self.model_dump().items(), other.model_dump().values(), strict=True - ) - } + cpus=self.cpus + other.cpus, + ram=self.ram + other.ram, + generic_resources=merged, ) def __sub__(self, other: "Resources") -> "Resources": + """operator for subtracting two Resources + Note that only numeric generic resources are subtracted + Non-numeric generic resources are ignored + """ + merged: dict[str, GenericResourceValueType] = {} + keys = set(self.generic_resources) | set(other.generic_resources) + for k in keys: + a = self.generic_resources.get(k) + b = other.generic_resources.get(k) + # subtracting non numeric values does not make sense, so we skip those for the resulting resource + if isinstance(a, int | float) and isinstance(b, int | float): + merged[k] = a - b + elif a is None and isinstance(b, int | float): + merged[k] = -b + elif b is None and isinstance(a, int | float): + merged[k] = a + return Resources.model_construct( - **{ - key: a - b - for (key, a), b in zip( - self.model_dump().items(), other.model_dump().values(), strict=True - ) - } + cpus=self.cpus - other.cpus, + ram=self.ram - other.ram, + generic_resources=merged, + ) + + def __hash__(self) -> int: + """Deterministic hash including cpus, ram (in bytes) and generic_resources.""" + # sort generic_resources items to ensure order-independent hashing + generic_items: tuple[tuple[str, GenericResourceValueType], ...] = tuple( + sorted(self.generic_resources.items()) + ) + return hash((self.cpus, self.ram, generic_items)) + + def as_flat_dict(self) -> dict[str, int | float | str]: + """Like model_dump, but flattens generic_resources to top level keys""" + base = self.model_dump() + base.update(base.pop("generic_resources")) + return base + + @classmethod + def from_flat_dict( + cls, + data: dict[str, int | float | str], + *, + mapping: dict[str, str] | None = None, + ) -> "Resources": + """Inverse of as_flat_dict with optional key mapping""" + mapped_data = data + if mapping: + mapped_data = {mapping.get(k, k): v for k, v in data.items()} + generic_resources = { + k: v for k, v in mapped_data.items() if k not in {"cpus", "ram"} + } + + return cls( + cpus=float(mapped_data.get("cpus", 0)), + ram=ByteSize(mapped_data.get("ram", 0)), + generic_resources=generic_resources, ) @field_validator("cpus", mode="before") @@ -174,8 +301,9 @@ def validate_bash_calls(cls, v): temp_file.flush() # NOTE: this will not capture runtime errors, but at least some syntax errors such as invalid quotes sh.bash( - "-n", temp_file.name - ) # pyright: ignore[reportCallIssue] # sh is untyped, but this call is safe for bash syntax checking + "-n", + temp_file.name, # pyright: ignore[reportCallIssue] - sh is untyped but safe for bash syntax checking + ) except sh.ErrorReturnCode as exc: msg = f"Invalid bash call in custom_boot_scripts: {v}, Error: {exc.stderr}" raise ValueError(msg) from exc diff --git a/packages/aws-library/tests/test_ec2_models.py b/packages/aws-library/tests/test_ec2_models.py index ed232ad0043d..22f03a0bd102 100644 --- a/packages/aws-library/tests/test_ec2_models.py +++ b/packages/aws-library/tests/test_ec2_models.py @@ -4,7 +4,13 @@ import pytest -from aws_library.ec2._models import AWSTagKey, AWSTagValue, EC2InstanceData, Resources +from aws_library.ec2._models import ( + AWSTagKey, + AWSTagValue, + EC2InstanceBootSpecific, + EC2InstanceData, + Resources, +) from faker import Faker from pydantic import ByteSize, TypeAdapter, ValidationError @@ -30,13 +36,68 @@ ( Resources(cpus=0.05, ram=ByteSize(1)), Resources(cpus=0.1, ram=ByteSize(0)), - False, + False, # CPU is smaller ), ( Resources(cpus=0.1, ram=ByteSize(0)), Resources(cpus=0.1, ram=ByteSize(1)), False, ), + ( + Resources(cpus=0.1, ram=ByteSize(0), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1)), + False, # RAM is smaller + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1)), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1)), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "2"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + True, # string resources are not comparable so "2" is considered larger + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + Resources(cpus=0.1, ram=ByteSize(1)), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1)), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + False, + ), ], ) def test_resources_ge_operator( @@ -66,13 +127,73 @@ def test_resources_ge_operator( ( Resources(cpus=0.05, ram=ByteSize(1)), Resources(cpus=0.1, ram=ByteSize(0)), - True, + False, # CPU is smaller ), ( Resources(cpus=0.1, ram=ByteSize(0)), Resources(cpus=0.1, ram=ByteSize(1)), False, ), + ( + Resources(cpus=0.1, ram=ByteSize(0), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1)), + False, # ram is not enough + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1)), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1)), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "2"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 2}), + True, # string resources are not comparable, so a > b + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + Resources(cpus=0.1, ram=ByteSize(1)), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1)), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + False, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), + True, + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "no"}), + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"SSE": "yes"}), + False, + ), ], ) def test_resources_gt_operator(a: Resources, b: Resources, a_greater_than_b: bool): @@ -92,6 +213,36 @@ def test_resources_gt_operator(a: Resources, b: Resources, a_greater_than_b: boo Resources(cpus=1, ram=ByteSize(34)), Resources(cpus=1.1, ram=ByteSize(35)), ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=1, ram=ByteSize(34)), + Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 1}), + ), + ( + Resources(cpus=0.1, ram=ByteSize(1)), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 1}), + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 2}), + ), + ( + Resources( + cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1, "SSE": "yes"} + ), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources(cpus=1.1, ram=ByteSize(35), generic_resources={"GPU": 2}), + ), # string resources are not summed + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "1"}), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources( + cpus=1.1, + ram=ByteSize(35), + ), + ), # string resources are ignored in summation ], ) def test_resources_add(a: Resources, b: Resources, result: Resources): @@ -101,7 +252,9 @@ def test_resources_add(a: Resources, b: Resources, result: Resources): def test_resources_create_as_empty(): - assert Resources.create_as_empty() == Resources(cpus=0, ram=ByteSize(0)) + assert Resources.create_as_empty() == Resources( + cpus=0, ram=ByteSize(0), generic_resources={} + ) @pytest.mark.parametrize( @@ -117,6 +270,41 @@ def test_resources_create_as_empty(): Resources(cpus=1, ram=ByteSize(1)), Resources.model_construct(cpus=-0.9, ram=ByteSize(33)), ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=1, ram=ByteSize(34)), + Resources.model_construct( + cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": 1} + ), + ), + ( + Resources(cpus=0.1, ram=ByteSize(1)), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources.model_construct( + cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": -1} + ), + ), + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1}), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources.model_construct( + cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": 0} + ), + ), + ( + Resources( + cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": 1, "SSE": "yes"} + ), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources.model_construct( + cpus=-0.9, ram=ByteSize(-33), generic_resources={"GPU": 0} + ), + ), # string resources are not summed + ( + Resources(cpus=0.1, ram=ByteSize(1), generic_resources={"GPU": "1"}), + Resources(cpus=1, ram=ByteSize(34), generic_resources={"GPU": 1}), + Resources.model_construct(cpus=-0.9, ram=ByteSize(-33)), + ), # string resources are ignored in summation ], ) def test_resources_sub(a: Resources, b: Resources, result: Resources): @@ -125,6 +313,24 @@ def test_resources_sub(a: Resources, b: Resources, result: Resources): assert a == result +def test_resources_flat_dict(): + r = Resources( + cpus=0.1, ram=ByteSize(1024), generic_resources={"GPU": 2, "SSE": "yes"} + ) + flat = r.as_flat_dict() + assert flat == {"cpus": 0.1, "ram": 1024, "GPU": 2, "SSE": "yes"} + + reconstructed = Resources.from_flat_dict(flat) + assert reconstructed == r + + # test with mapping + flat_with_oter_names = {"CPU": 0.1, "RAM": 1024, "GPU": 2, "SSE": "yes"} + reconstructed2 = Resources.from_flat_dict( + flat_with_oter_names, mapping={"CPU": "cpus", "RAM": "ram"} + ) + assert reconstructed2 == r + + @pytest.mark.parametrize("ec2_tag_key", ["", "/", " ", ".", "..", "_index"]) def test_aws_tag_key_invalid(ec2_tag_key: str): # for a key it raises @@ -148,7 +354,11 @@ def test_ec2_instance_data_hashable(faker: Faker): cpus=faker.pyfloat(min_value=0.1), ram=ByteSize(faker.pyint(min_value=123)), ), - {AWSTagKey("mytagkey"): AWSTagValue("mytagvalue")}, + { + TypeAdapter(AWSTagKey) + .validate_python("mytagkey"): TypeAdapter(AWSTagValue) + .validate_python("mytagvalue") + }, ) } second_set_of_ec2s = { @@ -163,10 +373,22 @@ def test_ec2_instance_data_hashable(faker: Faker): cpus=faker.pyfloat(min_value=0.1), ram=ByteSize(faker.pyint(min_value=123)), ), - {AWSTagKey("mytagkey"): AWSTagValue("mytagvalue")}, + { + TypeAdapter(AWSTagKey) + .validate_python("mytagkey"): TypeAdapter(AWSTagValue) + .validate_python("mytagvalue") + }, ) } union_of_sets = first_set_of_ec2s.union(second_set_of_ec2s) assert next(iter(first_set_of_ec2s)) in union_of_sets assert next(iter(second_set_of_ec2s)) in union_of_sets + + +def test_ec2_instance_boot_specific_with_invalid_custom_script(faker: Faker): + valid_model = EC2InstanceBootSpecific.model_json_schema()["examples"][0] + invalid_model = {**valid_model, "custom_boot_scripts": ["echo 'missing end quote"]} + + with pytest.raises(ValueError, match="Invalid bash call"): + EC2InstanceBootSpecific(**invalid_model) diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py index d97b0c896c36..97cfb440f45b 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/utils.py @@ -34,7 +34,7 @@ def parse_dask_job_id( job_id: str, ) -> tuple[ServiceKey, ServiceVersion, UserID, ProjectID, NodeID]: parts = job_id.split(":") - assert len(parts) == _JOB_ID_PARTS # nosec + assert len(parts) == _JOB_ID_PARTS, f"unexpected job id {parts=}" # nosec return ( parts[0], parts[1], diff --git a/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py b/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py index 3a81114ef878..7770ba74050a 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/resource_constraints.py @@ -1,8 +1,18 @@ -from typing import Any, TypeAlias +from typing import Final, Literal, Required, TypedDict from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY -DaskTaskResources: TypeAlias = dict[str, Any] +DASK_WORKER_THREAD_RESOURCE_NAME: Final[str] = "threads" + + +class DaskTaskResources(TypedDict, total=False): + CPU: Required[float] + RAM: Required[int] # in bytes + # threads is a constant of 1 (enforced by static type checkers via Literal) + # NOTE: a dask worker can take a task if it has a free thread, + # regardless of its resources so we need to be careful when interpreting + # the resources, adding the thread here will mimick this + threads: Required[Literal[1]] def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str: @@ -16,3 +26,24 @@ def get_ec2_instance_type_from_resources( if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY): return resource_name.split(":")[-1] return None + + +_RAM_SAFE_MARGIN_RATIO: Final[float] = ( + 0.1 # NOTE: machines always have less available RAM than advertised +) +_CPUS_SAFE_MARGIN: Final[float] = 0.1 + + +def estimate_dask_worker_resources_from_ec2_instance( + cpus: float, ram: int +) -> tuple[float, int]: + """Estimates the resources available to a dask worker running in an EC2 instance, + taking into account safe margins for CPU and RAM. + + Returns: + tuple: Estimated resources for the dask worker (cpus, ram). + """ + worker_cpus = max(0.1, cpus - _CPUS_SAFE_MARGIN) # ensure at least 0.1 CPU + worker_ram = int(ram * (1 - _RAM_SAFE_MARGIN_RATIO)) # apply safe margin + + return (worker_cpus, worker_ram) diff --git a/packages/dask-task-models-library/tests/test_resource_constraints.py b/packages/dask-task-models-library/tests/test_resource_constraints.py index 9a2c1e59e26b..121d2b740d23 100644 --- a/packages/dask-task-models-library/tests/test_resource_constraints.py +++ b/packages/dask-task-models-library/tests/test_resource_constraints.py @@ -1,11 +1,23 @@ from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from dask_task_models_library.resource_constraints import ( + DaskTaskResources, create_ec2_resource_constraint_key, get_ec2_instance_type_from_resources, ) from faker import Faker +def test_dask_task_resource(faker: Faker): + task_resources = DaskTaskResources( + CPU=faker.pyfloat(min_value=0.1, max_value=100), + RAM=faker.pyint(min_value=1024, max_value=1024**3), + threads=1, + ) + assert task_resources["threads"] == 1 + assert task_resources["CPU"] > 0 + assert task_resources["RAM"] >= 1024 + + def test_create_ec2_resource_constraint_key(faker: Faker): faker_instance_type = faker.pystr() assert ( diff --git a/packages/service-library/src/servicelib/docker_utils.py b/packages/service-library/src/servicelib/docker_utils.py index a919cb9487d7..374c05595beb 100644 --- a/packages/service-library/src/servicelib/docker_utils.py +++ b/packages/service-library/src/servicelib/docker_utils.py @@ -326,3 +326,33 @@ async def _pull_image_with_retry() -> None: ) await _pull_image_with_retry() + + +_CPUS_SAFE_MARGIN: Final[float] = ( + 1.4 # accounts for machine overhead (ops + sidecar itself) +) +_MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO: Final[float] = ( + 0.1 # NOTE: machines always have less available RAM than advertised +) +_SIDECARS_OPS_SAFE_RAM_MARGIN: Final[ByteSize] = TypeAdapter(ByteSize).validate_python( + "1GiB" +) +DYNAMIC_SIDECAR_MIN_CPUS: Final[float] = 0.5 + + +def estimate_dynamic_sidecar_resources_from_ec2_instance( + cpus: float, ram: int +) -> tuple[float, int]: + """Estimates the resources available to a dynamic-sidecar running in an EC2 instance, + taking into account safe margins for CPU and RAM, as the EC2 full resources are not completely visible + + Returns: + tuple: Estimated resources for the dynamic-sidecar (cpus, ram). + """ + # dynamic-sidecar usually needs less CPU + sidecar_cpus = max(DYNAMIC_SIDECAR_MIN_CPUS, cpus - _CPUS_SAFE_MARGIN) + sidecar_ram = int( + ram - _MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO * ram - _SIDECARS_OPS_SAFE_RAM_MARGIN + ) + + return (sidecar_cpus, sidecar_ram) diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py index e4294631224a..d1020d382f76 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py @@ -18,7 +18,7 @@ class TaskRequiresUnauthorizedEC2InstanceTypeError(AutoscalingRuntimeError): class TaskRequirementsAboveRequiredEC2InstanceTypeError(AutoscalingRuntimeError): msg_template: str = ( - "Task {task} requires {instance_type} but requires {resources}. " + "Task {task} specifies instance type {instance_type} but requests {resources}. {resources_diff} are missing! " "TIP: Ensure task resources requirements fit required instance type available resources." ) @@ -43,4 +43,6 @@ class DaskNoWorkersError(AutoscalingRuntimeError): class DaskWorkerNotFoundError(AutoscalingRuntimeError): - msg_template: str = "Dask worker running on {worker_host} is not registered to scheduler in {url}, it is not found!" + msg_template: str = ( + "Dask worker running on {worker_host} is not registered to scheduler in {url}, it is not found!" + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py index 0ae53b943954..38f994bcea6e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/settings.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/settings.py @@ -14,6 +14,7 @@ AnyUrl, Field, NonNegativeInt, + PositiveInt, TypeAdapter, field_validator, model_validator, @@ -241,6 +242,18 @@ class DaskMonitoringSettings(BaseCustomSettings): description="defines the authentication of the clusters created via clusters-keeper (can be None or TLS)", ), ] + DASK_NTHREADS: Annotated[ + NonNegativeInt, + Field( + description="if >0, it overrides the default number of threads per process in the dask-sidecars, (see description in dask-sidecar)", + ), + ] + DASK_NTHREADS_MULTIPLIER: Annotated[ + PositiveInt, + Field( + description="if >1, it overrides the default number of threads per process in the dask-sidecars, by multiplying the number of vCPUs with this factor (see description in dask-sidecar)", + ), + ] class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index 7645b300e8de..ca697a13fa06 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -23,6 +23,9 @@ def assign_task(self, task, task_resources: Resources) -> None: def has_resources_for_task(self, task_resources: Resources) -> bool: return bool(self.available_resources >= task_resources) + def has_assigned_tasks(self) -> bool: + return len(self.assigned_tasks) > 0 + @dataclass(frozen=True, kw_only=True, slots=True) class AssignedTasksToInstanceType(_TaskAssignmentMixin): @@ -37,9 +40,6 @@ def __post_init__(self) -> None: if self.available_resources == Resources.create_as_empty(): object.__setattr__(self, "available_resources", self.ec2_instance.resources) - def has_assigned_tasks(self) -> bool: - return bool(self.available_resources < self.ec2_instance.resources) - @dataclass(frozen=True, kw_only=True, slots=True) class AssociatedInstance(_BaseInstance): diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py index ff4b0ad4f5b6..535df02d3cf2 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.py @@ -94,24 +94,33 @@ async def _analyze_current_cluster( docker_nodes: list[Node] = await auto_scaling_mode.get_monitored_nodes(app) # get the EC2 instances we have - existing_ec2_instances = await get_ec2_client(app).get_instances( + existing_ec2_instances: list[EC2InstanceData] = await get_ec2_client( + app + ).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], tags=auto_scaling_mode.get_ec2_tags(app), state_names=["pending", "running"], ) - terminated_ec2_instances = await get_ec2_client(app).get_instances( + terminated_ec2_instances: list[EC2InstanceData] = await get_ec2_client( + app + ).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], tags=auto_scaling_mode.get_ec2_tags(app), state_names=["terminated"], ) - warm_buffer_ec2_instances = await get_ec2_client(app).get_instances( + warm_buffer_ec2_instances: list[EC2InstanceData] = await get_ec2_client( + app + ).get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], tags=get_deactivated_warm_buffer_ec2_tags(auto_scaling_mode.get_ec2_tags(app)), state_names=["stopped"], ) + for i in itertools.chain(existing_ec2_instances, warm_buffer_ec2_instances): + auto_scaling_mode.add_instance_generic_resources(app, i) + attached_ec2s, pending_ec2s = associate_ec2_instances_with_nodes( docker_nodes, existing_ec2_instances ) @@ -343,7 +352,9 @@ async def _try_attach_pending_ec2s( ) -async def _sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: +async def _sorted_allowed_instance_types( + app: FastAPI, auto_scaling_mode: AutoscalingProvider +) -> list[EC2InstanceType]: app_settings: ApplicationSettings = app.state.settings assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec ec2_client = get_ec2_client(app) @@ -366,8 +377,10 @@ def _as_selection(instance_type: EC2InstanceType) -> int: # NOTE: will raise ValueError if allowed_instance_types not in allowed_instance_type_names return allowed_instance_type_names.index(f"{instance_type.name}") - allowed_instance_types.sort(key=_as_selection) - return allowed_instance_types + return [ + auto_scaling_mode.adjust_instance_type_resources(app, instance_type) + for instance_type in sorted(allowed_instance_types, key=_as_selection) + ] async def _activate_and_notify( @@ -728,15 +741,15 @@ async def _find_needed_instances( task_required_resources = auto_scaling_mode.get_task_required_resources( task ) - task_required_ec2_instance = ( - await auto_scaling_mode.get_task_defined_instance(app, task) + task_required_ec2 = await auto_scaling_mode.get_task_defined_instance( + app, task ) # first check if we can assign the task to one of the newly tobe created instances if _try_assign_task_to_ec2_instance_type( task, instances=needed_new_instance_types_for_tasks, - task_required_ec2_instance=task_required_ec2_instance, + task_required_ec2_instance=task_required_ec2, task_required_resources=task_required_resources, ): continue @@ -744,12 +757,12 @@ async def _find_needed_instances( # so we need to find what we can create now try: # check if exact instance type is needed first - if task_required_ec2_instance: + if task_required_ec2: defined_ec2 = find_selected_instance_type_for_task( - task_required_ec2_instance, + task_required_ec2, available_ec2_types, task, - auto_scaling_mode.get_task_required_resources(task), + task_required_resources, ) needed_new_instance_types_for_tasks.append( AssignedTasksToInstanceType( @@ -763,7 +776,7 @@ async def _find_needed_instances( # we go for best fitting type best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance( available_ec2_types, - auto_scaling_mode.get_task_required_resources(task), + task_required_resources, score_type=utils_ec2.closest_instance_policy, ) needed_new_instance_types_for_tasks.append( @@ -1575,7 +1588,10 @@ async def auto_scale_cluster( the additional load. """ # current state - allowed_instance_types = await _sorted_allowed_instance_types(app) + allowed_instance_types = await _sorted_allowed_instance_types( + app, auto_scaling_mode + ) + cluster = await _analyze_current_cluster( app, auto_scaling_mode, allowed_instance_types ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py index c9b2d498fd66..d580868627f2 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_computational.py @@ -1,8 +1,13 @@ import collections +import dataclasses import logging -from typing import cast +from typing import Any, cast from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources +from aws_library.ec2._models import EC2InstanceType +from dask_task_models_library.resource_constraints import ( + estimate_dask_worker_resources_from_ec2_instance, +) from fastapi import FastAPI from models_library.clusters import ClusterAuthentication from models_library.docker import DockerLabelKey @@ -88,13 +93,7 @@ async def list_unrunnable_tasks(self, app: FastAPI) -> list[DaskTask]: def get_task_required_resources(self, task) -> Resources: assert self # nosec - task_required_resources = utils.resources_from_dask_task(task) - # ensure cpu is set at least to 1 as dask-workers use 1 thread per CPU - if task_required_resources.cpus < 1.0: - task_required_resources = task_required_resources.model_copy( - update={"cpus": 1.0} - ) - return task_required_resources + return utils.resources_from_dask_task(task) async def get_task_defined_instance( self, app: FastAPI, task @@ -141,10 +140,14 @@ async def compute_cluster_used_resources( list_of_used_resources: list[Resources] = await logged_gather( *(self.compute_node_used_resources(app, i) for i in instances) ) - counter = collections.Counter(dict.fromkeys(Resources.model_fields, 0)) + counter: collections.Counter = collections.Counter() for result in list_of_used_resources: - counter.update(result.model_dump()) - return Resources.model_validate(dict(counter)) + counter.update(result.as_flat_dict()) + + flat_counter: dict[str, Any] = dict(counter) + flat_counter.setdefault("cpus", 0) + flat_counter.setdefault("ram", 0) + return Resources.from_flat_dict(flat_counter) async def compute_cluster_total_resources( self, app: FastAPI, instances: list[AssociatedInstance] @@ -152,7 +155,9 @@ async def compute_cluster_total_resources( assert self # nosec try: return await dask.compute_cluster_total_resources( - _scheduler_url(app), _scheduler_auth(app), instances + _scheduler_url(app), + _scheduler_auth(app), + [i.ec2_instance for i in instances], ) except DaskNoWorkersError: return Resources.create_as_empty() @@ -182,3 +187,33 @@ async def is_instance_retired( async def try_retire_nodes(self, app: FastAPI) -> None: assert self # nosec await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app)) + + def add_instance_generic_resources( + self, app: FastAPI, instance: EC2InstanceData + ) -> None: + assert self # nosec + assert app # nosec + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_DASK # nosec + dask.add_instance_generic_resources(app_settings.AUTOSCALING_DASK, instance) + + def adjust_instance_type_resources( + self, app: FastAPI, instance_type: EC2InstanceType + ) -> EC2InstanceType: + assert self # nosec + assert app # nosec + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_DASK # nosec + adjusted_cpus, adjusted_ram = estimate_dask_worker_resources_from_ec2_instance( + instance_type.resources.cpus, instance_type.resources.ram + ) + replaced_instance_type = dataclasses.replace( + instance_type, + resources=instance_type.resources.model_copy( + update={"cpus": adjusted_cpus, "ram": ByteSize(adjusted_ram)} + ), + ) + dask.add_instance_type_generic_resource( + app_settings.AUTOSCALING_DASK, replaced_instance_type + ) + return replaced_instance_type diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py index e6dbca840e37..d7499dc92e1a 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_dynamic.py @@ -1,7 +1,12 @@ +import dataclasses + from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources +from aws_library.ec2._models import EC2InstanceType from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node, Task +from pydantic import ByteSize +from servicelib.docker_utils import estimate_dynamic_sidecar_resources_from_ec2_instance from types_aiobotocore_ec2.literals import InstanceTypeType from ...core.settings import get_application_settings @@ -104,3 +109,29 @@ async def try_retire_nodes(self, app: FastAPI) -> None: assert self # nosec assert app # nosec # nothing to do here + + def add_instance_generic_resources( + self, app: FastAPI, instance: EC2InstanceData + ) -> None: + assert self # nosec + assert app # nosec + assert instance # nosec + # nothing to do at the moment + + def adjust_instance_type_resources( + self, app: FastAPI, instance_type: EC2InstanceType + ) -> EC2InstanceType: + assert self # nosec + assert app # nosec + adjusted_cpus, adjusted_ram = ( + estimate_dynamic_sidecar_resources_from_ec2_instance( + instance_type.resources.cpus, instance_type.resources.ram + ) + ) + + return dataclasses.replace( + instance_type, + resources=instance_type.resources.model_copy( + update={"cpus": adjusted_cpus, "ram": ByteSize(adjusted_ram)} + ), + ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py index 355394b9f1d3..d2f711229c4f 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_provider_protocol.py @@ -1,6 +1,7 @@ from typing import Protocol from aws_library.ec2 import EC2InstanceData, EC2Tags, Resources +from aws_library.ec2._models import EC2InstanceType from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node as DockerNode @@ -47,3 +48,11 @@ async def is_instance_retired( ) -> bool: ... async def try_retire_nodes(self, app: FastAPI) -> None: ... + + def add_instance_generic_resources( + self, app: FastAPI, instance: EC2InstanceData + ) -> None: ... + + def adjust_instance_type_resources( + self, app: FastAPI, instance_type: EC2InstanceType + ) -> EC2InstanceType: ... diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py index 4fb76ee5e129..1b5225966809 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_utils_computational.py @@ -1,10 +1,12 @@ import logging -from typing import Final +from typing import Final, cast -from aws_library.ec2 import Resources +from aws_library.ec2 import GenericResourceValueType, Resources from dask_task_models_library.resource_constraints import ( + DaskTaskResources, get_ec2_instance_type_from_resources, ) +from pydantic import ByteSize from ...models import DaskTask @@ -13,11 +15,23 @@ _DEFAULT_MAX_CPU: Final[float] = 1 _DEFAULT_MAX_RAM: Final[int] = 1024 +DASK_TO_RESOURCE_NAME_MAPPING: Final[dict[str, str]] = { + "CPU": "cpus", + "RAM": "ram", +} +_DEFAULT_DASK_RESOURCES: Final[DaskTaskResources] = DaskTaskResources( + CPU=_DEFAULT_MAX_CPU, RAM=ByteSize(_DEFAULT_MAX_RAM), threads=1 +) + def resources_from_dask_task(task: DaskTask) -> Resources: - return Resources( - cpus=task.required_resources.get("CPU", _DEFAULT_MAX_CPU), - ram=task.required_resources.get("RAM", _DEFAULT_MAX_RAM), + task_resources = ( + _DEFAULT_DASK_RESOURCES | task.required_resources + ) # merge defaults with task resources (task resources override defaults) + + return Resources.from_flat_dict( + cast(dict[str, GenericResourceValueType], task_resources), + mapping=DASK_TO_RESOURCE_NAME_MAPPING, ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py index 966593295e87..cf16afc2b5fc 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/dask.py @@ -4,27 +4,35 @@ import re from collections import defaultdict from collections.abc import AsyncIterator, Coroutine -from typing import Any, Final, TypeAlias +from typing import Any, Final, TypeAlias, TypedDict import dask.typing import distributed -import distributed.scheduler from aws_library.ec2 import EC2InstanceData, Resources -from dask_task_models_library.resource_constraints import DaskTaskResources +from aws_library.ec2._models import EC2InstanceType +from dask_task_models_library.resource_constraints import ( + DASK_WORKER_THREAD_RESOURCE_NAME, + DaskTaskResources, + create_ec2_resource_constraint_key, +) from distributed.core import Status from models_library.clusters import ClusterAuthentication, TLSAuthentication -from pydantic import AnyUrl, ByteSize, TypeAdapter +from pydantic import AnyUrl from ..core.errors import ( DaskNoWorkersError, DaskSchedulerNotFoundError, DaskWorkerNotFoundError, ) -from ..models import AssociatedInstance, DaskTask, DaskTaskId +from ..core.settings import DaskMonitoringSettings +from ..models import DaskTask, DaskTaskId from ..utils.utils_ec2 import ( node_host_name_from_ec2_private_dns, node_ip_from_ec2_private_dns, ) +from .cluster_scaling._utils_computational import ( + DASK_TO_RESOURCE_NAME_MAPPING, +) _logger = logging.getLogger(__name__) @@ -101,6 +109,10 @@ def _find_by_worker_host( _, details = dask_worker if match := re.match(DASK_NAME_PATTERN, details["name"]): return bool(match.group("private_ip") == node_hostname) + _logger.error( + "Unexpected worker name format: %s. TIP: this should be investigated as this is unexpected", + details["name"], + ) return False filtered_workers = dict(filter(_find_by_worker_host, workers.items())) @@ -114,6 +126,44 @@ def _find_by_worker_host( return next(iter(filtered_workers.items())) +class _DaskClusterTasks(TypedDict): + processing: dict[DaskWorkerUrl, list[tuple[dask.typing.Key, DaskTaskResources]]] + unrunnable: dict[dask.typing.Key, DaskTaskResources] + + +async def _list_cluster_known_tasks( + client: distributed.Client, +) -> _DaskClusterTasks: + def _list_on_scheduler( + dask_scheduler: distributed.Scheduler, + ) -> _DaskClusterTasks: + worker_to_processing_tasks = defaultdict(list) + unrunnable_tasks = {} + for task_key, task_state in dask_scheduler.tasks.items(): + if task_state.processing_on: + worker_to_processing_tasks[task_state.processing_on.address].append( + ( + task_key, + (task_state.resource_restrictions or {}) + | {DASK_WORKER_THREAD_RESOURCE_NAME: 1}, + ) + ) + elif task_state in dask_scheduler.unrunnable: + unrunnable_tasks[task_key] = ( + task_state.resource_restrictions or {} + ) | {DASK_WORKER_THREAD_RESOURCE_NAME: 1} + + return _DaskClusterTasks( + processing=worker_to_processing_tasks, # type: ignore[typeddict-item] + unrunnable=unrunnable_tasks, # type: ignore[typeddict-item] + ) + + list_of_tasks: _DaskClusterTasks = await client.run_on_scheduler(_list_on_scheduler) + _logger.debug("found tasks: %s", list_of_tasks) + + return list_of_tasks + + async def is_worker_connected( scheduler_url: AnyUrl, authentication: ClusterAuthentication, @@ -163,20 +213,10 @@ async def list_unrunnable_tasks( DaskSchedulerNotFoundError """ - def _list_tasks( - dask_scheduler: distributed.Scheduler, - ) -> dict[dask.typing.Key, dict[str, float]]: - # NOTE: task.key can be a byte, str, or a tuple - return { - task.key: task.resource_restrictions or {} - for task in dask_scheduler.unrunnable - } - async with _scheduler_client(scheduler_url, authentication) as client: - list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = ( - await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks)) - ) - _logger.debug("found unrunnable tasks: %s", list_of_tasks) + known_tasks = await _list_cluster_known_tasks(client) + list_of_tasks = known_tasks["unrunnable"] + return [ DaskTask( task_id=_dask_key_to_dask_task_id(task_id), @@ -195,26 +235,11 @@ async def list_processing_tasks_per_worker( DaskSchedulerNotFoundError """ - def _list_processing_tasks( - dask_scheduler: distributed.Scheduler, - ) -> dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]]: - worker_to_processing_tasks = defaultdict(list) - for task_key, task_state in dask_scheduler.tasks.items(): - if task_state.processing_on: - worker_to_processing_tasks[task_state.processing_on.address].append( - (task_key, task_state.resource_restrictions or {}) - ) - return worker_to_processing_tasks - async with _scheduler_client(scheduler_url, authentication) as client: - worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = ( - await _wrap_client_async_routine( - client.run_on_scheduler(_list_processing_tasks) - ) - ) - _logger.debug("found processing tasks: %s", worker_to_tasks) + worker_to_tasks = await _list_cluster_known_tasks(client) + tasks_per_worker = defaultdict(list) - for worker, tasks in worker_to_tasks.items(): + for worker, tasks in worker_to_tasks["processing"].items(): for task_id, required_resources in tasks: tasks_per_worker[worker].append( DaskTask( @@ -257,66 +282,54 @@ async def get_worker_used_resources( DaskNoWorkersError """ - def _list_processing_tasks_on_worker( - dask_scheduler: distributed.Scheduler, *, worker_url: str - ) -> list[tuple[dask.typing.Key, DaskTaskResources]]: - processing_tasks = [] - for task_key, task_state in dask_scheduler.tasks.items(): - if task_state.processing_on and ( - task_state.processing_on.address == worker_url - ): - processing_tasks.append( - (task_key, task_state.resource_restrictions or {}) - ) - return processing_tasks - async with _scheduler_client(scheduler_url, authentication) as client: worker_url, _ = _dask_worker_from_ec2_instance(client, ec2_instance) + known_tasks = await _list_cluster_known_tasks(client) + worker_processing_tasks = known_tasks["processing"].get(worker_url, []) + if not worker_processing_tasks: + return Resources.create_as_empty() - _logger.debug("looking for processing tasks for %s", f"{worker_url=}") - - # now get the used resources - worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = ( - await _wrap_client_async_routine( - client.run_on_scheduler( - _list_processing_tasks_on_worker, worker_url=worker_url - ), - ) - ) - - total_resources_used: collections.Counter[str] = collections.Counter() + total_resources_used: collections.Counter = collections.Counter() for _, task_resources in worker_processing_tasks: total_resources_used.update(task_resources) _logger.debug("found %s for %s", f"{total_resources_used=}", f"{worker_url=}") - return Resources( - cpus=total_resources_used.get("CPU", 0), - ram=TypeAdapter(ByteSize).validate_python( - total_resources_used.get("RAM", 0) - ), + return Resources.from_flat_dict( + dict(total_resources_used), mapping=DASK_TO_RESOURCE_NAME_MAPPING ) async def compute_cluster_total_resources( scheduler_url: AnyUrl, authentication: ClusterAuthentication, - instances: list[AssociatedInstance], + instances: list[EC2InstanceData], ) -> Resources: if not instances: return Resources.create_as_empty() async with _scheduler_client(scheduler_url, authentication) as client: - instance_hosts = ( - node_ip_from_ec2_private_dns(i.ec2_instance) for i in instances - ) + ec2_instance_resources_map = { + node_ip_from_ec2_private_dns(i): i.resources for i in instances + } scheduler_info = client.scheduler_info() if "workers" not in scheduler_info or not scheduler_info["workers"]: raise DaskNoWorkersError(url=scheduler_url) workers: dict[str, Any] = scheduler_info["workers"] + cluster_resources = Resources.create_as_empty() for worker_details in workers.values(): - if worker_details["host"] not in instance_hosts: + if worker_details["host"] not in ec2_instance_resources_map: continue + # get dask information about resources + worker_dask_resources = worker_details["resources"] + worker_dask_nthreads = worker_details["nthreads"] + cluster_resources += Resources.from_flat_dict( + { + **worker_dask_resources, + DASK_WORKER_THREAD_RESOURCE_NAME: worker_dask_nthreads, + }, + mapping=DASK_TO_RESOURCE_NAME_MAPPING, + ) - return Resources.create_as_empty() + return cluster_resources async def try_retire_nodes( @@ -326,3 +339,43 @@ async def try_retire_nodes( await _wrap_client_async_routine( client.retire_workers(close_workers=False, remove=False) ) + + +_LARGE_RESOURCE: Final[int] = 99999 + + +def add_instance_generic_resources( + settings: DaskMonitoringSettings, instance: EC2InstanceData +) -> None: + instance_threads = max(1, round(instance.resources.cpus)) + if settings.DASK_NTHREADS > 0: + # this overrides everything + instance_threads = settings.DASK_NTHREADS + if settings.DASK_NTHREADS_MULTIPLIER > 1: + instance_threads = instance_threads * settings.DASK_NTHREADS_MULTIPLIER + instance.resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] = ( + instance_threads + ) + + instance.resources.generic_resources[ + create_ec2_resource_constraint_key(instance.type) + ] = _LARGE_RESOURCE + + +def add_instance_type_generic_resource( + settings: DaskMonitoringSettings, instance_type: EC2InstanceType +) -> None: + instance_threads = max(1, round(instance_type.resources.cpus)) + if settings.DASK_NTHREADS > 0: + # this overrides everything + instance_threads = settings.DASK_NTHREADS + if settings.DASK_NTHREADS_MULTIPLIER > 1: + instance_threads = instance_threads * settings.DASK_NTHREADS_MULTIPLIER + + instance_type.resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] = ( + instance_threads + ) + + instance_type.resources.generic_resources[ + create_ec2_resource_constraint_key(instance_type.name) + ] = _LARGE_RESOURCE diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py index 9de65aac078f..af84e97bc01b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py @@ -35,7 +35,7 @@ async def on_shutdown() -> None: ... def get_instrumentation(app: FastAPI) -> AutoscalingInstrumentation: - if not app.state.instrumentation: + if not hasattr(app.state, "instrumentation"): raise ConfigurationError( msg="Instrumentation not setup. Please check the configuration." ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py index 13c25dcd2112..cc2c1ad3ee0c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/cluster_scaling.py @@ -109,11 +109,12 @@ def find_selected_instance_type_for_task( selected_instance = filtered_instances[0] # check that the assigned resources and the machine resource fit - if task_required_resources > selected_instance.resources: + if not (task_required_resources <= selected_instance.resources): raise TaskRequirementsAboveRequiredEC2InstanceTypeError( task=task, instance_type=selected_instance, resources=task_required_resources, + resources_diff=task_required_resources - selected_instance.resources, ) return selected_instance diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index a48951986763..f4feea61cfde 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -395,14 +395,16 @@ async def compute_cluster_used_resources( docker_client: AutoscalingDocker, nodes: list[Node] ) -> Resources: """Returns the total amount of resources (reservations) used on each of the given nodes""" - list_of_used_resources = await logged_gather( + list_of_used_resources: list[Resources] = await logged_gather( *(compute_node_used_resources(docker_client, node) for node in nodes) ) - counter = collections.Counter(dict.fromkeys(list(Resources.model_fields), 0)) + flat_counter: collections.Counter = collections.Counter() for result in list_of_used_resources: - counter.update(result.model_dump()) + flat_counter.update(result.as_flat_dict()) + flat_counter.setdefault("cpus", 0) + flat_counter.setdefault("ram", 0) - return Resources.model_validate(dict(counter)) + return Resources.from_flat_dict(dict(flat_counter)) _COMMAND_TIMEOUT_S = 10 diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 57c9b381fc2d..3c77b01be372 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -30,6 +30,7 @@ Resources, ) from common_library.json_serialization import json_dumps +from dask_task_models_library.container_tasks.utils import generate_dask_job_id from deepdiff import DeepDiff from faker import Faker from fakeredis.aioredis import FakeRedis @@ -52,7 +53,11 @@ Service, TaskSpec, ) +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID from models_library.services_metadata_runtime import SimcoreContainerLabels +from models_library.services_types import ServiceKey, ServiceVersion +from models_library.users import UserID from pydantic import ByteSize, NonNegativeInt, PositiveInt, TypeAdapter from pytest_mock import MockType from pytest_mock.plugin import MockerFixture @@ -380,8 +385,8 @@ def enabled_computational_mode( "AUTOSCALING_DASK": "{}", "DASK_MONITORING_URL": faker.url(), "DASK_SCHEDULER_AUTH": "{}", - "DASK_MONITORING_USER_NAME": faker.user_name(), - "DASK_MONITORING_PASSWORD": faker.password(), + "DASK_NTHREADS": f"{faker.pyint(min_value=0, max_value=10)}", + "DASK_NTHREADS_MULTIPLIER": f"{faker.pyint(min_value=1, max_value=4)}", }, ) @@ -857,9 +862,55 @@ def _creator(**cluter_overrides) -> Cluster: return _creator +@pytest.fixture +def service_version() -> ServiceVersion: + return "1.0.234" + + +@pytest.fixture +def service_key() -> ServiceKey: + return "simcore/services/dynamic/test" + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def project_id(faker: Faker) -> ProjectID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def fake_dask_job_id( + service_key: ServiceKey, + service_version: ServiceVersion, + user_id: UserID, + project_id: ProjectID, + faker: Faker, +) -> Callable[[], str]: + def _() -> str: + return generate_dask_job_id( + service_key=service_key, + service_version=service_version, + user_id=user_id, + project_id=project_id, + node_id=faker.uuid4(cast_to=None), + ) + + return _ + + @pytest.fixture async def create_dask_task( dask_spec_cluster_client: distributed.Client, + fake_dask_job_id: Callable[[], str], ) -> Callable[..., distributed.Future]: def _remote_pytest_fct(x: int, y: int) -> int: return x + y @@ -874,6 +925,7 @@ def _creator( 43, resources=required_resources, pure=False, + key=fake_dask_job_id(), **overrides, ) assert future diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py index f83eaac9ea8b..bba8531e0032 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_computational.py @@ -14,7 +14,7 @@ from collections.abc import Awaitable, Callable, Iterator from copy import deepcopy from dataclasses import dataclass -from typing import Any, Final, cast +from typing import Any, cast from unittest import mock import arrow @@ -22,7 +22,9 @@ import pytest from aws_library.ec2 import Resources from dask_task_models_library.resource_constraints import ( + DASK_WORKER_THREAD_RESOURCE_NAME, create_ec2_resource_constraint_key, + estimate_dask_worker_resources_from_ec2_instance, ) from faker import Faker from fastapi import FastAPI @@ -126,10 +128,14 @@ def _assert_rabbit_autoscaling_message_sent( instances_running=0, ) expected_message = default_message.model_copy(update=message_update_kwargs) - mock_rabbitmq_post_message.assert_called_once_with( - app, - expected_message, - ) + # in this mock we get all kind of messages, we just want to assert one of them is the expected one and there is only one + autoscaling_status_messages = [ + call_args.args[1] + for call_args in mock_rabbitmq_post_message.call_args_list + if isinstance(call_args.args[1], RabbitAutoscalingStatusMessage) + ] + assert len(autoscaling_status_messages) == 1, "too many messages sent" + assert autoscaling_status_messages[0] == expected_message @pytest.fixture @@ -255,16 +261,25 @@ async def _create_task_with_resources( instance_types = await ec2_client.describe_instance_types( InstanceTypes=[dask_task_imposed_ec2_type] ) + assert instance_types assert "InstanceTypes" in instance_types assert instance_types["InstanceTypes"] assert "MemoryInfo" in instance_types["InstanceTypes"][0] assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] + ec2_ram = TypeAdapter(ByteSize).validate_python( + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", + ) + assert "VCpuInfo" in instance_types["InstanceTypes"][0] + assert "DefaultVCpus" in instance_types["InstanceTypes"][0]["VCpuInfo"] + ec2_cpus = instance_types["InstanceTypes"][0]["VCpuInfo"]["DefaultVCpus"] + required_cpus, required_ram = estimate_dask_worker_resources_from_ec2_instance( + ec2_cpus, ec2_ram + ) task_resources = Resources( - cpus=1, - ram=TypeAdapter(ByteSize).validate_python( - f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", - ), + cpus=required_cpus, + ram=ByteSize(required_ram), + generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, ) assert task_resources @@ -285,13 +300,11 @@ class _ScaleUpParams: expected_num_instances: int -_RESOURCE_TO_DASK_RESOURCE_MAP: Final[dict[str, str]] = {"CPUS": "CPU", "RAM": "RAM"} - - def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: return { - _RESOURCE_TO_DASK_RESOURCE_MAP[res_key.upper()]: res_value - for res_key, res_value in resources.model_dump().items() + "CPU": resources.cpus, + "RAM": resources.ram, + **dict(resources.generic_resources.items()), } @@ -441,7 +454,7 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( _ScaleUpParams( imposed_instance_type=None, task_resources=Resources( - cpus=1, ram=TypeAdapter(ByteSize).validate_python("128Gib") + cpus=1, ram=TypeAdapter(ByteSize).validate_python("115Gib") ), num_tasks=1, expected_instance_type="r5n.4xlarge", @@ -463,7 +476,7 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( _ScaleUpParams( imposed_instance_type="r5n.8xlarge", task_resources=Resources( - cpus=1, ram=TypeAdapter(ByteSize).validate_python("116Gib") + cpus=1, ram=TypeAdapter(ByteSize).validate_python("115Gib") ), num_tasks=1, expected_instance_type="r5n.8xlarge", @@ -636,7 +649,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 ) mock_docker_tag_node.reset_mock() mock_docker_set_node_availability.assert_not_called() - mock_rabbitmq_post_message.assert_called_once() + assert mock_rabbitmq_post_message.call_count == 3 mock_rabbitmq_post_message.reset_mock() # now we have 1 monitored node that needs to be mocked @@ -932,7 +945,6 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso [InstanceTypeType | None, Resources], DaskTaskResources ], ec2_client: EC2Client, - faker: Faker, caplog: pytest.LogCaptureFixture, ): # we have nothing running now @@ -1280,7 +1292,7 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star _ScaleUpParams( imposed_instance_type=None, task_resources=Resources( - cpus=1, ram=TypeAdapter(ByteSize).validate_python("128Gib") + cpus=1, ram=TypeAdapter(ByteSize).validate_python("115Gib") ), num_tasks=1, expected_instance_type="r5n.4xlarge", @@ -1455,7 +1467,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( _ScaleUpParams( imposed_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB task_resources=Resources( - cpus=8, ram=TypeAdapter(ByteSize).validate_python("15Gib") + cpus=7.9, ram=TypeAdapter(ByteSize).validate_python("15Gib") ), num_tasks=12, expected_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB @@ -1464,7 +1476,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( _ScaleUpParams( imposed_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB task_resources=Resources( - cpus=32, ram=TypeAdapter(ByteSize).validate_python("20480MB") + cpus=31.9, ram=TypeAdapter(ByteSize).validate_python("20480MB") ), num_tasks=7, expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py index 8ba17f3f34ff..b16ac41233f0 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_dynamic.py @@ -437,6 +437,7 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect / 1e9, "ram": app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER * fake_node.description.resources.memory_bytes, + "generic_resources": {}, }, ) @@ -712,11 +713,9 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: cluster_total_resources={ "cpus": fake_attached_node.description.resources.nano_cp_us / 1e9, "ram": fake_attached_node.description.resources.memory_bytes, + "generic_resources": {}, }, - cluster_used_resources={ - "cpus": float(0), - "ram": 0, - }, + cluster_used_resources={"cpus": float(0), "ram": 0, "generic_resources": {}}, instances_running=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() @@ -979,7 +978,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: _ScaleUpParams( imposed_instance_type=None, service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("114Gib") ), num_services=1, expected_instance_type="r5n.4xlarge", @@ -991,7 +990,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: _ScaleUpParams( imposed_instance_type="t2.xlarge", service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("4Gib") + cpus=2.6, ram=TypeAdapter(ByteSize).validate_python("4Gib") ), num_services=1, expected_instance_type="t2.xlarge", @@ -1003,7 +1002,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: _ScaleUpParams( imposed_instance_type="r5n.8xlarge", service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("114Gib") ), num_services=1, expected_instance_type="r5n.8xlarge", @@ -1166,7 +1165,7 @@ async def test_cluster_scaling_up_and_down_against_aws( ), num_services=10, expected_instance_type="r5n.4xlarge", # 1 GPU, 16 CPUs, 128GiB - expected_num_instances=4, + expected_num_instances=5, ), id="sim4life-light", ), @@ -1255,7 +1254,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( _ScaleUpParams( imposed_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB service_resources=Resources( - cpus=8, ram=TypeAdapter(ByteSize).validate_python("15Gib") + cpus=6.6, ram=TypeAdapter(ByteSize).validate_python("15Gib") ), num_services=12, expected_instance_type="g4dn.2xlarge", # 1 GPU, 8 CPUs, 32GiB @@ -1264,7 +1263,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( _ScaleUpParams( imposed_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB service_resources=Resources( - cpus=32, ram=TypeAdapter(ByteSize).validate_python("20480MB") + cpus=30.6, ram=TypeAdapter(ByteSize).validate_python("20480MB") ), num_services=7, expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB @@ -1557,7 +1556,7 @@ async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 _ScaleUpParams( imposed_instance_type=None, service_resources=Resources( - cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") + cpus=4, ram=TypeAdapter(ByteSize).validate_python("114Gib") ), num_services=1, expected_instance_type="r5n.4xlarge", diff --git a/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py b/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py index e412487f4ea6..e051766dae31 100644 --- a/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py +++ b/services/autoscaling/tests/unit/test_modules_cluster_scaling_utils_computational.py @@ -6,6 +6,9 @@ import pytest from aws_library.ec2 import Resources +from dask_task_models_library.resource_constraints import ( + DASK_WORKER_THREAD_RESOURCE_NAME, +) from pydantic import ByteSize, TypeAdapter from simcore_service_autoscaling.models import DaskTask, DaskTaskResources from simcore_service_autoscaling.modules.cluster_scaling._utils_computational import ( @@ -23,13 +26,16 @@ Resources( cpus=_DEFAULT_MAX_CPU, ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM), + generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, ), id="missing resources returns defaults", ), pytest.param( DaskTask(task_id="fake", required_resources={"CPU": 2.5}), Resources( - cpus=2.5, ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM) + cpus=2.5, + ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM), + generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, ), id="only cpus defined", ), @@ -38,16 +44,25 @@ task_id="fake", required_resources={"CPU": 2.5, "RAM": 2 * 1024 * 1024 * 1024}, ), - Resources(cpus=2.5, ram=TypeAdapter(ByteSize).validate_python("2GiB")), + Resources( + cpus=2.5, + ram=TypeAdapter(ByteSize).validate_python("2GiB"), + generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, + ), id="cpu and ram defined", ), pytest.param( DaskTask( task_id="fake", - required_resources={"CPU": 2.5, "ram": 2 * 1024 * 1024 * 1024}, + required_resources={"CPU": 2.5, "xram": 2 * 1024 * 1024 * 1024}, # type: ignore ), Resources( - cpus=2.5, ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM) + cpus=2.5, + ram=TypeAdapter(ByteSize).validate_python(_DEFAULT_MAX_RAM), + generic_resources={ + DASK_WORKER_THREAD_RESOURCE_NAME: 1, + "xram": 2 * 1024 * 1024 * 1024, + }, ), id="invalid naming", ), diff --git a/services/autoscaling/tests/unit/test_modules_dask.py b/services/autoscaling/tests/unit/test_modules_dask.py index 9c53865cfa30..d99c0f2f0869 100644 --- a/services/autoscaling/tests/unit/test_modules_dask.py +++ b/services/autoscaling/tests/unit/test_modules_dask.py @@ -11,6 +11,9 @@ import pytest from arrow import utcnow from aws_library.ec2 import Resources +from dask_task_models_library.resource_constraints import ( + DASK_WORKER_THREAD_RESOURCE_NAME, +) from faker import Faker from models_library.clusters import ( ClusterAuthentication, @@ -31,14 +34,20 @@ EC2InstanceData, ) from simcore_service_autoscaling.modules.dask import ( + DaskMonitoringSettings, DaskTask, _scheduler_client, + add_instance_generic_resources, + compute_cluster_total_resources, get_worker_still_has_results_in_memory, get_worker_used_resources, + is_worker_connected, + is_worker_retired, list_processing_tasks_per_worker, list_unrunnable_tasks, + try_retire_nodes, ) -from tenacity import retry, stop_after_delay, wait_fixed +from tenacity import AsyncRetrying, retry, stop_after_delay, wait_fixed _authentication_types = [ NoAuthentication(), @@ -115,11 +124,14 @@ async def test_list_unrunnable_tasks( # we have nothing running now assert await list_unrunnable_tasks(scheduler_url, scheduler_authentication) == [] # start a task that cannot run - dask_task_impossible_resources = {"XRAM": 213} + dask_task_impossible_resources = DaskTaskResources(XRAM=213, threads=1) # type: ignore future = create_dask_task(dask_task_impossible_resources) assert future assert await list_unrunnable_tasks(scheduler_url, scheduler_authentication) == [ - DaskTask(task_id=future.key, required_resources=dask_task_impossible_resources) + DaskTask( + task_id=future.key, + required_resources=(dask_task_impossible_resources), + ) ] # remove that future, will remove the task del future @@ -154,7 +166,10 @@ def _add_fct(x: int, y: int) -> int: scheduler_url, scheduler_authentication ) == { next(iter(dask_spec_cluster_client.scheduler_info()["workers"])): [ - DaskTask(task_id=DaskTaskId(future_queued_task.key), required_resources={}) + DaskTask( + task_id=DaskTaskId(future_queued_task.key), + required_resources=DaskTaskResources(threads=1), # type: ignore + ) ] } @@ -358,7 +373,11 @@ def _add_fct(x: int, y: int) -> int: await _wait_for_dask_scheduler_to_change_state() assert await get_worker_used_resources( scheduler_url, scheduler_authentication, fake_localhost_ec2_instance_data - ) == Resources(cpus=num_cpus, ram=ByteSize(0)) + ) == Resources( + cpus=num_cpus, + ram=ByteSize(0), + generic_resources={DASK_WORKER_THREAD_RESOURCE_NAME: 1}, + ) result = await future_queued_task.result(timeout=_DASK_SCHEDULER_REACTION_TIME_S) # type: ignore assert result == 7 @@ -370,3 +389,139 @@ def _add_fct(x: int, y: int) -> int: ) == Resources.create_as_empty() ) + + +async def test_compute_cluster_total_resources( + dask_spec_local_cluster: distributed.SpecCluster, + scheduler_url: AnyUrl, + scheduler_authentication: ClusterAuthentication, + fake_ec2_instance_data: Callable[..., EC2InstanceData], + fake_localhost_ec2_instance_data: EC2InstanceData, +): + # asking for resources of empty cluster returns empty resources + assert ( + await compute_cluster_total_resources( + scheduler_url, scheduler_authentication, [] + ) + == Resources.create_as_empty() + ) + ec2_instance_data = fake_ec2_instance_data() + assert ec2_instance_data.resources.cpus > 0 + assert ec2_instance_data.resources.ram > 0 + assert ec2_instance_data.resources.generic_resources == {} + assert ( + await compute_cluster_total_resources( + scheduler_url, scheduler_authentication, [ec2_instance_data] + ) + == Resources.create_as_empty() + ), "this instance is not connected and should not be accounted for" + + cluster_total_resources = await compute_cluster_total_resources( + scheduler_url, scheduler_authentication, [fake_localhost_ec2_instance_data] + ) + assert cluster_total_resources.cpus > 0 + assert cluster_total_resources.ram > 0 + assert DASK_WORKER_THREAD_RESOURCE_NAME in cluster_total_resources.generic_resources + assert ( + cluster_total_resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] == 2 + ) + + +@pytest.mark.parametrize( + "dask_nthreads, dask_nthreads_multiplier, expected_threads_resource", + [(4, 1, 4), (4, 2, 8), (0, 2, -1)], +) +def test_add_instance_generic_resources( + scheduler_url: AnyUrl, + scheduler_authentication: ClusterAuthentication, + fake_ec2_instance_data: Callable[..., EC2InstanceData], + dask_nthreads: int, + dask_nthreads_multiplier: int, + expected_threads_resource: int, +): + settings = DaskMonitoringSettings( + DASK_MONITORING_URL=scheduler_url, + DASK_SCHEDULER_AUTH=scheduler_authentication, + DASK_NTHREADS=dask_nthreads, + DASK_NTHREADS_MULTIPLIER=dask_nthreads_multiplier, + ) + ec2_instance_data = fake_ec2_instance_data() + assert ec2_instance_data.resources.cpus > 0 + assert ec2_instance_data.resources.ram > 0 + assert ec2_instance_data.resources.generic_resources == {} + + add_instance_generic_resources(settings, ec2_instance_data) + assert ec2_instance_data.resources.generic_resources != {} + assert ( + DASK_WORKER_THREAD_RESOURCE_NAME + in ec2_instance_data.resources.generic_resources + ) + if expected_threads_resource < 0: + expected_threads_resource = int( + ec2_instance_data.resources.cpus * dask_nthreads_multiplier + ) + assert ( + ec2_instance_data.resources.generic_resources[DASK_WORKER_THREAD_RESOURCE_NAME] + == expected_threads_resource + ) + + +async def test_is_worker_connected( + scheduler_url: AnyUrl, + scheduler_authentication: ClusterAuthentication, + fake_ec2_instance_data: Callable[..., EC2InstanceData], + fake_localhost_ec2_instance_data: EC2InstanceData, +): + ec2_instance_data = fake_ec2_instance_data() + assert ( + await is_worker_connected( + scheduler_url, scheduler_authentication, ec2_instance_data + ) + is False + ) + + assert ( + await is_worker_connected( + scheduler_url, scheduler_authentication, fake_localhost_ec2_instance_data + ) + is True + ) + + +async def test_is_worker_retired( + scheduler_url: AnyUrl, + scheduler_authentication: ClusterAuthentication, + fake_ec2_instance_data: Callable[..., EC2InstanceData], + fake_localhost_ec2_instance_data: EC2InstanceData, +): + ec2_instance_data = fake_ec2_instance_data() + # fake instance is not connected, so it cannot be retired + assert ( + await is_worker_retired( + scheduler_url, scheduler_authentication, ec2_instance_data + ) + is False + ) + + # localhost is connected, but not retired + assert ( + await is_worker_retired( + scheduler_url, scheduler_authentication, fake_localhost_ec2_instance_data + ) + is False + ) + + # retire localhost worker + await try_retire_nodes(scheduler_url, scheduler_authentication) + async for attempt in AsyncRetrying( + stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True + ): + with attempt: + assert ( + await is_worker_retired( + scheduler_url, + scheduler_authentication, + fake_localhost_ec2_instance_data, + ) + is True + ) diff --git a/services/autoscaling/tests/unit/test_modules_instrumentation_core.py b/services/autoscaling/tests/unit/test_modules_instrumentation_core.py new file mode 100644 index 000000000000..ffc8d87bcb9d --- /dev/null +++ b/services/autoscaling/tests/unit/test_modules_instrumentation_core.py @@ -0,0 +1,39 @@ +# pylint: disable=no-value-for-parameter +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=too-many-positional-arguments +# pylint: disable=too-many-statements +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import pytest +from fastapi import FastAPI +from pytest_simcore.helpers.typing_env import EnvVarsDict +from simcore_service_autoscaling.core.errors import ConfigurationError +from simcore_service_autoscaling.modules.instrumentation._core import ( + get_instrumentation, + has_instrumentation, +) + + +@pytest.fixture +def disabled_instrumentation( + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch +) -> None: + assert app_environment + monkeypatch.setenv("AUTOSCALING_PROMETHEUS_INSTRUMENTATION_ENABLED", "false") + + +async def test_disabled_instrumentation( + disabled_rabbitmq: None, + disabled_ec2: None, + disabled_ssm: None, + disabled_instrumentation: None, + mocked_redis_server: None, + initialized_app: FastAPI, +): + # instrumentation disabled by default + assert not has_instrumentation(initialized_app) + + with pytest.raises(ConfigurationError): + get_instrumentation(initialized_app) diff --git a/services/autoscaling/tests/unit/test_utils_cluster_scaling.py b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py index 1c325c1f6234..5a15d63a6ed3 100644 --- a/services/autoscaling/tests/unit/test_utils_cluster_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_cluster_scaling.py @@ -75,6 +75,24 @@ async def test_associate_ec2_instances_with_nodes_with_no_correspondence( assert len(non_associated_instances) == len(ec2_instances) +async def test_associate_ec2_instances_with_nodes_with_invalid_dns( + fake_ec2_instance_data: Callable[..., EC2InstanceData], + node: Callable[..., DockerNode], +): + nodes = [node() for _ in range(10)] + ec2_instances = [ + fake_ec2_instance_data(aws_private_dns="invalid-dns-name") for _ in range(10) + ] + + ( + associated_instances, + non_associated_instances, + ) = associate_ec2_instances_with_nodes(nodes, ec2_instances) + + assert not associated_instances + assert non_associated_instances + + async def test_associate_ec2_instances_with_corresponding_nodes( fake_ec2_instance_data: Callable[..., EC2InstanceData], node: Callable[..., DockerNode], diff --git a/services/autoscaling/tests/unit/test_utils_rabbitmq.py b/services/autoscaling/tests/unit/test_utils_rabbitmq.py index 006155b1e0fa..8741949e76a7 100644 --- a/services/autoscaling/tests/unit/test_utils_rabbitmq.py +++ b/services/autoscaling/tests/unit/test_utils_rabbitmq.py @@ -122,31 +122,6 @@ async def _(labels: dict[DockerLabelKey, str]) -> list[Task]: return _ -@pytest.fixture -def service_version() -> ServiceVersion: - return "1.0.0" - - -@pytest.fixture -def service_key() -> ServiceKey: - return "simcore/services/dynamic/test" - - -@pytest.fixture -def node_id(faker: Faker) -> NodeID: - return faker.uuid4(cast_to=None) - - -@pytest.fixture -def project_id(faker: Faker) -> ProjectID: - return faker.uuid4(cast_to=None) - - -@pytest.fixture -def user_id(faker: Faker) -> UserID: - return faker.pyint(min_value=1) - - @pytest.fixture def dask_task( service_key: ServiceKey, diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml index dc44dd9ece75..d3ba68cb76a8 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/data/docker-compose.yml @@ -104,6 +104,8 @@ services: AUTOSCALING_RABBITMQ: ${AUTOSCALING_RABBITMQ} DASK_MONITORING_URL: tls://dask-scheduler:8786 DASK_SCHEDULER_AUTH: '{"type":"tls","tls_ca_file":"${DASK_TLS_CA_FILE}","tls_client_cert":"${DASK_TLS_CERT}","tls_client_key":"${DASK_TLS_KEY}"}' + DASK_NTHREADS: ${DASK_NTHREADS} + DASK_NTHREADS_MULTIPLIER: ${DASK_NTHREADS_MULTIPLIER} EC2_INSTANCES_ALLOWED_TYPES: ${WORKERS_EC2_INSTANCES_ALLOWED_TYPES} EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING: ${WORKERS_EC2_INSTANCES_COLD_START_DOCKER_IMAGES_PRE_PULLING} EC2_INSTANCES_CUSTOM_TAGS: ${WORKERS_EC2_INSTANCES_CUSTOM_TAGS} diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py index 10103909a631..3446cb2f1497 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py @@ -5,6 +5,9 @@ import arrow from dask_task_models_library.container_tasks.protocol import ContainerEnvsDict +from dask_task_models_library.resource_constraints import ( + estimate_dask_worker_resources_from_ec2_instance, +) from models_library.api_schemas_catalog.services import ServiceGet from models_library.api_schemas_clusters_keeper.ec2_instances import EC2InstanceTypeGet from models_library.api_schemas_directorv2.services import ( @@ -292,15 +295,14 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: image_resources: ImageResources = node_resources[ DEFAULT_SINGLE_SERVICE_NAME ] - image_resources.resources["CPU"].set_value( - float(selected_ec2_instance_type.cpus) - _CPUS_SAFE_MARGIN - ) - image_resources.resources["RAM"].set_value( - int( - selected_ec2_instance_type.ram - - _RAM_SAFE_MARGIN_RATIO * selected_ec2_instance_type.ram + adjusted_cpus, adjusted_ram = ( + estimate_dask_worker_resources_from_ec2_instance( + float(selected_ec2_instance_type.cpus), + selected_ec2_instance_type.ram, ) ) + image_resources.resources["CPU"].set_value(adjusted_cpus) + image_resources.resources["RAM"].set_value(adjusted_ram) await project_nodes_repo.update( connection, diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index 9bd620d3d23b..4d244d2c1f35 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -88,6 +88,10 @@ X_FORWARDED_PROTO, X_SIMCORE_USER_AGENT, ) +from servicelib.docker_utils import ( + DYNAMIC_SIDECAR_MIN_CPUS, + estimate_dynamic_sidecar_resources_from_ec2_instance, +) from servicelib.logging_utils import log_context from servicelib.rabbitmq import RemoteMethodNotRegisteredError, RPCServerError from servicelib.rabbitmq.rpc_interfaces.catalog import services as catalog_rpc @@ -652,12 +656,12 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: app, user_id, project_id, node_id, service_key, service_version ) scalable_service_name = DEFAULT_SINGLE_SERVICE_NAME - new_cpus_value = float(selected_ec2_instance_type.cpus) - _CPUS_SAFE_MARGIN - new_ram_value = int( - selected_ec2_instance_type.ram - - _MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO * selected_ec2_instance_type.ram - - _SIDECARS_OPS_SAFE_RAM_MARGIN + new_cpus_value, new_ram_value = ( + estimate_dynamic_sidecar_resources_from_ec2_instance( + selected_ec2_instance_type.cpus, selected_ec2_instance_type.ram + ) ) + if DEFAULT_SINGLE_SERVICE_NAME not in node_resources: # NOTE: we go for the largest sub-service and scale it up/down scalable_service_name, hungry_service_resources = max( @@ -680,17 +684,14 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: } ) new_cpus_value = max( - float(selected_ec2_instance_type.cpus) - - _CPUS_SAFE_MARGIN - - other_services_resources["CPU"], - _MIN_NUM_CPUS, + new_cpus_value - other_services_resources["CPU"], + DYNAMIC_SIDECAR_MIN_CPUS, ) - new_ram_value = int( - selected_ec2_instance_type.ram - - _MACHINE_TOTAL_RAM_SAFE_MARGIN_RATIO * selected_ec2_instance_type.ram - - other_services_resources["RAM"] - - _SIDECARS_OPS_SAFE_RAM_MARGIN + + new_ram_value = max( + int(new_ram_value - other_services_resources["RAM"]), 128 * 1024 * 1024 ) + # scale the service node_resources[scalable_service_name].resources["CPU"].set_value(new_cpus_value) node_resources[scalable_service_name].resources["RAM"].set_value(new_ram_value)