-
Notifications
You must be signed in to change notification settings - Fork 32
🎨Computational backend: Make sure the number of threads of a dask-worker is computed for autoscaling #8423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
🎨Computational backend: Make sure the number of threads of a dask-worker is computed for autoscaling #8423
Changes from all commits
80d26ad
6063e67
d20a086
096eb33
da04daa
b312603
f313e7c
9ab3a53
fe9a25f
00f5b3f
6c24793
486e369
800cb4f
ba78df5
6ec4ddf
7ee5db7
7a658ac
a4f5510
ddc44d5
2412486
70d09e5
72f69ae
ff7c364
86c0605
b7f5236
89a5753
8852ceb
e8bc99e
986f90a
2aec54b
066478c
4b35150
212c94e
c0d0deb
8f66328
71bd759
65c79c7
2fcbe21
8655f12
cc274ef
bd55b5d
92e6bfd
5869ed4
2e51aa8
d69bb01
72d63a4
08cf343
fcedb48
0c78bc7
58e80f0
cf7cfac
887c679
924336a
65d5a65
9c1f25d
bc34b92
0cb5933
3c8592f
6094e8c
da84e5a
7bb47d7
1c30fae
d576d78
59f9088
0f500ca
6ddf16e
4785952
d4c42cd
2475017
267a2cc
2c17756
96af1b7
0dcad8e
7484a48
9545cfe
87fe39e
c295e65
4aa836d
98af3b2
814dbbc
c8da911
3f1376f
bec5358
32d9139
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,45 +14,162 @@ | |
Field, | ||
NonNegativeFloat, | ||
NonNegativeInt, | ||
StrictFloat, | ||
StrictInt, | ||
StringConstraints, | ||
field_validator, | ||
) | ||
from pydantic.config import JsonDict | ||
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType | ||
|
||
GenericResourceValueType: TypeAlias = StrictInt | StrictFloat | str | ||
|
||
|
||
class Resources(BaseModel, frozen=True): | ||
cpus: NonNegativeFloat | ||
ram: ByteSize | ||
generic_resources: Annotated[ | ||
dict[str, GenericResourceValueType], | ||
Field( | ||
default_factory=dict, | ||
description=( | ||
"Arbitrary additional resources (e.g. {'threads': 8}). " | ||
"Numeric values are treated as quantities and participate in add/sub/compare." | ||
), | ||
), | ||
] = DEFAULT_FACTORY | ||
|
||
@classmethod | ||
def create_as_empty(cls) -> "Resources": | ||
return cls(cpus=0, ram=ByteSize(0)) | ||
|
||
def __ge__(self, other: "Resources") -> bool: | ||
return self.cpus >= other.cpus and self.ram >= other.ram | ||
"""operator for >= comparison | ||
if self has greater or equal resources than other, returns True | ||
This will return True only if all of the resources in self are greater or equal to other | ||
|
||
Note that generic_resources are compared only if they are numeric | ||
Non-numeric generic resources must be equal in both or only defined in self | ||
to be considered greater or equal | ||
""" | ||
if self == other: | ||
return True | ||
return self > other | ||
|
||
def __gt__(self, other: "Resources") -> bool: | ||
return self.cpus > other.cpus or self.ram > other.ram | ||
"""operator for > comparison | ||
if self has resources greater than other, returns True | ||
This will return True only if all of the resources in self are greater than other | ||
|
||
Note that generic_resources are compared only if they are numeric | ||
Non-numeric generic resources must only be defined in self | ||
to be considered greater | ||
""" | ||
if (self.cpus < other.cpus) or (self.ram < other.ram): | ||
return False | ||
|
||
keys = set(self.generic_resources) | set(other.generic_resources) | ||
for k in keys: | ||
a = self.generic_resources.get(k) | ||
b = other.generic_resources.get(k) | ||
if a is None: | ||
return False | ||
if b is None: | ||
# a is greater as b is not defined | ||
continue | ||
if isinstance(a, int | float) and isinstance(b, int | float): | ||
if a < b: | ||
return False | ||
else: | ||
# remaining options is a is str and b is str or mixed types | ||
assert isinstance(a, str) # nosec | ||
assert isinstance(b, int | float | str) # nosec | ||
|
||
# here we have either everything greater or equal or non-comparable strings | ||
|
||
return self != other | ||
|
||
def __add__(self, other: "Resources") -> "Resources": | ||
"""operator for adding two Resources | ||
Note that only numeric generic resources are added | ||
Non-numeric generic resources are ignored | ||
""" | ||
merged: dict[str, GenericResourceValueType] = {} | ||
keys = set(self.generic_resources) | set(other.generic_resources) | ||
for k in keys: | ||
a = self.generic_resources.get(k) | ||
b = other.generic_resources.get(k) | ||
# adding non numeric values does not make sense, so we skip those for the resulting resource | ||
if isinstance(a, int | float) and isinstance(b, int | float): | ||
merged[k] = a + b | ||
elif a is None and isinstance(b, int | float): | ||
merged[k] = b | ||
elif b is None and isinstance(a, int | float): | ||
merged[k] = a | ||
|
||
return Resources.model_construct( | ||
**{ | ||
key: a + b | ||
for (key, a), b in zip( | ||
self.model_dump().items(), other.model_dump().values(), strict=True | ||
) | ||
} | ||
cpus=self.cpus + other.cpus, | ||
ram=self.ram + other.ram, | ||
generic_resources=merged, | ||
) | ||
|
||
def __sub__(self, other: "Resources") -> "Resources": | ||
"""operator for subtracting two Resources | ||
Note that only numeric generic resources are subtracted | ||
Non-numeric generic resources are ignored | ||
""" | ||
merged: dict[str, GenericResourceValueType] = {} | ||
keys = set(self.generic_resources) | set(other.generic_resources) | ||
for k in keys: | ||
a = self.generic_resources.get(k) | ||
b = other.generic_resources.get(k) | ||
# subtracting non numeric values does not make sense, so we skip those for the resulting resource | ||
if isinstance(a, int | float) and isinstance(b, int | float): | ||
merged[k] = a - b | ||
elif a is None and isinstance(b, int | float): | ||
merged[k] = -b | ||
elif b is None and isinstance(a, int | float): | ||
merged[k] = a | ||
|
||
return Resources.model_construct( | ||
**{ | ||
key: a - b | ||
for (key, a), b in zip( | ||
self.model_dump().items(), other.model_dump().values(), strict=True | ||
) | ||
} | ||
cpus=self.cpus - other.cpus, | ||
ram=self.ram - other.ram, | ||
generic_resources=merged, | ||
) | ||
|
||
def __hash__(self) -> int: | ||
"""Deterministic hash including cpus, ram (in bytes) and generic_resources.""" | ||
# sort generic_resources items to ensure order-independent hashing | ||
generic_items: tuple[tuple[str, GenericResourceValueType], ...] = tuple( | ||
sorted(self.generic_resources.items()) | ||
) | ||
return hash((self.cpus, self.ram, generic_items)) | ||
|
||
def as_flat_dict(self) -> dict[str, int | float | str]: | ||
"""Like model_dump, but flattens generic_resources to top level keys""" | ||
base = self.model_dump() | ||
base.update(base.pop("generic_resources")) | ||
return base | ||
Comment on lines
+148
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
|
||
@classmethod | ||
def from_flat_dict( | ||
cls, | ||
data: dict[str, int | float | str], | ||
*, | ||
mapping: dict[str, str] | None = None, | ||
) -> "Resources": | ||
"""Inverse of as_flat_dict with optional key mapping""" | ||
mapped_data = data | ||
if mapping: | ||
mapped_data = {mapping.get(k, k): v for k, v in data.items()} | ||
generic_resources = { | ||
k: v for k, v in mapped_data.items() if k not in {"cpus", "ram"} | ||
} | ||
|
||
return cls( | ||
cpus=float(mapped_data.get("cpus", 0)), | ||
ram=ByteSize(mapped_data.get("ram", 0)), | ||
generic_resources=generic_resources, | ||
) | ||
Comment on lines
+155
to
173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
|
||
@field_validator("cpus", mode="before") | ||
|
@@ -174,8 +291,9 @@ def validate_bash_calls(cls, v): | |
temp_file.flush() | ||
# NOTE: this will not capture runtime errors, but at least some syntax errors such as invalid quotes | ||
sh.bash( | ||
"-n", temp_file.name | ||
) # pyright: ignore[reportCallIssue] # sh is untyped, but this call is safe for bash syntax checking | ||
"-n", | ||
temp_file.name, # pyright: ignore[reportCallIssue] | ||
) # sh is untyped, but this call is safe for bash syntax checking | ||
except sh.ErrorReturnCode as exc: | ||
msg = f"Invalid bash call in custom_boot_scripts: {v}, Error: {exc.stderr}" | ||
raise ValueError(msg) from exc | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
__hash__
implementation attempts to hashself.ram
directly, butram
is aByteSize
object (Pydantic type). This may not hash consistently. Consider converting it to an integer value:hash((self.cpus, int(self.ram), generic_items))
.Copilot uses AI. Check for mistakes.