Skip to content

Commit 2e69d4c

Browse files
committed
Patch kubernetes Api Client to work around lock contention issues
Unclear whether the benefits are worth the risk of breaking changes here, but this is a workaround for the signifcant lock contention issues with the k8s client surfaced in #23933 (reply in thread). This appears to be a known issue with the k8s python client - see kubernetes-client/python#2284. Test Plan: BK
1 parent 3bf5c78 commit 2e69d4c

File tree

1 file changed

+41
-2
lines changed
  • python_modules/libraries/dagster-k8s/dagster_k8s

1 file changed

+41
-2
lines changed

python_modules/libraries/dagster-k8s/dagster_k8s/client.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66

77
import kubernetes.client
88
import kubernetes.client.rest
9+
import six
910
from dagster import (
1011
DagsterInstance,
1112
_check as check,
1213
)
1314
from dagster._core.storage.dagster_run import DagsterRunStatus
15+
from kubernetes.client.api_client import ApiClient
1416
from kubernetes.client.models import V1Job, V1JobStatus
1517

1618
try:
@@ -91,6 +93,39 @@ class DagsterK8sJobStatusException(Exception):
9193
]
9294

9395

96+
class PatchedApiClient(ApiClient):
97+
# Forked from ApiClient implementation to pass configuration object down into created model
98+
# objects, avoiding lock contention issues. See https://github.com/kubernetes-client/python/issues/2284
99+
def __deserialize_model(self, data, klass):
100+
"""Deserializes list or dict to model.
101+
102+
:param data: dict, list.
103+
:param klass: class literal.
104+
:return: model object.
105+
"""
106+
if not klass.openapi_types and not hasattr(klass, "get_real_child_model"):
107+
return data
108+
109+
# Below is the only change from the base ApiClient implementation - pass through the
110+
# Configuration object to each newly created model so that each one does not have to create
111+
# one and acquire a lock
112+
kwargs = {"local_vars_configuration": self.configuration}
113+
114+
if data is not None and klass.openapi_types is not None and isinstance(data, (list, dict)):
115+
for attr, attr_type in six.iteritems(klass.openapi_types):
116+
if klass.attribute_map[attr] in data:
117+
value = data[klass.attribute_map[attr]]
118+
kwargs[attr] = self.__deserialize(value, attr_type)
119+
120+
instance = klass(**kwargs)
121+
122+
if hasattr(instance, "get_real_child_model"):
123+
klass_name = instance.get_real_child_model(data)
124+
if klass_name:
125+
instance = self.__deserialize(data, klass_name)
126+
return instance
127+
128+
94129
def k8s_api_retry(
95130
fn: Callable[..., T],
96131
max_retries: int,
@@ -209,8 +244,12 @@ def __init__(self, batch_api, core_api, logger, sleeper, timer):
209244
@staticmethod
210245
def production_client(batch_api_override=None, core_api_override=None):
211246
return DagsterKubernetesClient(
212-
batch_api=batch_api_override or kubernetes.client.BatchV1Api(),
213-
core_api=core_api_override or kubernetes.client.CoreV1Api(),
247+
batch_api=(
248+
batch_api_override or kubernetes.client.BatchV1Api(api_client=PatchedApiClient())
249+
),
250+
core_api=(
251+
core_api_override or kubernetes.client.CoreV1Api(api_client=PatchedApiClient())
252+
),
214253
logger=logging.info,
215254
sleeper=time.sleep,
216255
timer=time.time,

0 commit comments

Comments
 (0)