diff --git a/crate/operator/constants.py b/crate/operator/constants.py index 5e80b0fd..21b87c52 100644 --- a/crate/operator/constants.py +++ b/crate/operator/constants.py @@ -125,4 +125,5 @@ class OperationType(str, enum.Enum): SUSPEND = "suspend" RESUME = "resume" CHANGE_COMPUTE = "change_compute" + CHANGE_EXPOSURE = "change_exposure" UNKNOWN = "unknown" diff --git a/crate/operator/create.py b/crate/operator/create.py index 72c083f5..4837154a 100644 --- a/crate/operator/create.py +++ b/crate/operator/create.py @@ -1231,59 +1231,67 @@ def get_data_service( dns_record: Optional[str], source_ranges: Optional[List[str]] = None, additional_annotations: Optional[Dict] = None, + use_traefik: bool = False, ) -> V1Service: res_annotations = {} - if config.CLOUD_PROVIDER == CloudProvider.AWS: - res_annotations.update( - { - # https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws - "service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa - "service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa - # Default idle timeout is 60s, which kills the connection on long-running queries # noqa - "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa - "service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa - "service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa - } - ) - elif config.CLOUD_PROVIDER == CloudProvider.AZURE: - # https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations - # https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset - res_annotations.update( - { - "service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa - "service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa - } - ) - if dns_record: - res_annotations.update( - {"external-dns.alpha.kubernetes.io/hostname": dns_record} - ) + if not use_traefik: + if config.CLOUD_PROVIDER == CloudProvider.AWS: + res_annotations.update( + { + # https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws + "service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa + "service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa + # Default idle timeout is 60s, which kills the connection on long-running queries # noqa + "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa + "service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa + "service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa + } + ) + elif config.CLOUD_PROVIDER == CloudProvider.AZURE: + # https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations + # https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset + res_annotations.update( + { + "service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa + "service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa + } + ) + + if dns_record: + res_annotations.update( + {"external-dns.alpha.kubernetes.io/hostname": dns_record} + ) if additional_annotations: res_annotations.update(additional_annotations) - service_name = f"crate-{name}" + service_type = "ClusterIP" if use_traefik else "LoadBalancer" + + spec_kwargs = dict( + ports=[ + V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value), + V1ServicePort( + name="psql", port=postgres_port, target_port=Port.POSTGRES.value + ), + ], + selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name}, + type=service_type, + ) + + if not use_traefik: + spec_kwargs["external_traffic_policy"] = "Local" + if source_ranges: + spec_kwargs["load_balancer_source_ranges"] = source_ranges return V1Service( metadata=V1ObjectMeta( annotations=res_annotations, labels=labels, - name=service_name, + name=f"crate-{name}", owner_references=owner_references, ), - spec=V1ServiceSpec( - ports=[ - V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value), - V1ServicePort( - name="psql", port=postgres_port, target_port=Port.POSTGRES.value - ), - ], - selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name}, - type="LoadBalancer", - external_traffic_policy="Local", - load_balancer_source_ranges=source_ranges if source_ranges else None, - ), + spec=V1ServiceSpec(**spec_kwargs), ) @@ -1366,6 +1374,7 @@ async def create_services( logger: logging.Logger, source_ranges: Optional[List[str]] = None, additional_annotations: Optional[Dict] = None, + use_traefik: bool = False, ) -> None: async with GlobalApiClient() as api_client: core = CoreV1Api(api_client) @@ -1380,6 +1389,7 @@ async def create_services( dns_record, source_ranges, additional_annotations=additional_annotations, + use_traefik=use_traefik, ) await call_kubeapi( core.create_namespaced_service, @@ -1425,15 +1435,16 @@ async def recreate_services( transport_port = ports_spec.get("transport", Port.TRANSPORT.value) cratedb_labels = build_cratedb_labels(name, meta) - owner_references = get_owner_references(name, meta) source_ranges = spec["cluster"].get("allowedCIDRs", None) - additional_annotations = ( spec.get("cluster", {}).get("service", {}).get("annotations", {}) ) dns_record = spec.get("cluster", {}).get("externalDNS", None) + exposure = spec.get("cluster", {}).get("exposure", "loadbalancer") + use_traefik = exposure == "traefik" + await create_services( owner_references, namespace, @@ -1446,6 +1457,7 @@ async def recreate_services( logger, source_ranges, additional_annotations, + use_traefik=use_traefik, ) @@ -1676,8 +1688,10 @@ async def handle( # type: ignore logger: logging.Logger, source_ranges: Optional[List[str]] = None, additional_annotations: Optional[Dict] = None, + exposure: str = "loadbalancer", **kwargs: Any, ): + use_traefik = exposure == "traefik" await create_services( owner_references, namespace, @@ -1690,6 +1704,7 @@ async def handle( # type: ignore logger, source_ranges, additional_annotations, + use_traefik=use_traefik, ) diff --git a/crate/operator/exposure.py b/crate/operator/exposure.py new file mode 100644 index 00000000..e19a7b9a --- /dev/null +++ b/crate/operator/exposure.py @@ -0,0 +1,565 @@ +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +import logging +from typing import Any, List, Optional + +import kopf +from kubernetes_asyncio.client import ( + ApiException, + CoreV1Api, + CustomObjectsApi, + V1OwnerReference, +) + +from crate.operator.constants import Port +from crate.operator.create import ( + build_cratedb_labels, + create_services, + get_owner_references, +) +from crate.operator.utils import crate +from crate.operator.utils.k8s_api_client import GlobalApiClient +from crate.operator.utils.kopf import StateBasedSubHandler +from crate.operator.utils.kubeapi import call_kubeapi, get_cratedb_resource + + +async def create_traefik_resources( + owner_references: Optional[List[V1OwnerReference]], + namespace: str, + name: str, + dns_record: Optional[str], + source_ranges: Optional[List[str]], + http_port: int, + postgres_port: int, + logger: logging.Logger, +) -> None: + """ + Create MiddlewareTCP and IngressRouteTCP resources for Traefik exposure. + + If source_ranges is non‑empty, a MiddlewareTCP with IP allowlist is created + and referenced in the IngressRouteTCP routes. Otherwise, the IngressRouteTCP + routes are created without any middleware (no IP restriction). + + :param owner_references: Owner references to set on the created resources. + :param namespace: Kubernetes namespace where the resources will be created. + :param name: Name of the CrateDB cluster (used in resource names). + :param dns_record: External DNS hostname. + :param source_ranges: List of CIDR ranges for IP allowlist. + :param http_port: Port number for HTTP traffic (default 4200). + :param postgres_port: Port number for PostgreSQL traffic (default 5432). + :param logger: Logger for operation tracking. + """ + if not dns_record: + raise kopf.PermanentError("externalDNS is required when exposure=traefik") + + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + + owner_refs_serialized = ( + [api_client.sanitize_for_serialization(owner) for owner in owner_references] + if owner_references + else [] + ) + + has_ip_restriction = source_ranges and len(source_ranges) > 0 + middleware_name = None + + # Create MiddlewareTCP only if IP restriction is desired + if has_ip_restriction: + middleware_name = f"cratedb-allow-{name}" + middleware_body = { + "apiVersion": "traefik.io/v1alpha1", + "kind": "MiddlewareTCP", + "metadata": { + "name": middleware_name, + "namespace": namespace, + "labels": { + "app.kubernetes.io/component": "cratedb", + "app.kubernetes.io/managed-by": "crate-operator", + "app.kubernetes.io/name": name, + "app.kubernetes.io/part-of": "cratedb", + }, + "ownerReferences": owner_refs_serialized, + }, + "spec": {"ipAllowList": {"sourceRange": source_ranges}}, + } + await call_kubeapi( + custom_api.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewaretcps", + body=middleware_body, + ) + else: + logger.info("No IP restriction – skipping MiddlewareTCP creation") + + def build_route(port: int) -> dict: + route = { + "match": f"HostSNI(`{dns_record}`)", + "services": [{"name": f"crate-{name}", "port": port}], + } + if has_ip_restriction: + route["middlewares"] = [{"name": middleware_name}] + return route + + pg_ingress_body = { + "apiVersion": "traefik.io/v1alpha1", + "kind": "IngressRouteTCP", + "metadata": { + "name": f"crate-pg-{name}", + "namespace": namespace, + "ownerReferences": owner_refs_serialized, + }, + "spec": { + "entryPoints": ["postgres"], + "routes": [build_route(postgres_port)], + "tls": {"passthrough": True}, + }, + } + await call_kubeapi( + custom_api.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + body=pg_ingress_body, + ) + + http_ingress_body = { + "apiVersion": "traefik.io/v1alpha1", + "kind": "IngressRouteTCP", + "metadata": { + "name": f"crate-http-{name}", + "namespace": namespace, + "ownerReferences": owner_refs_serialized, + }, + "spec": { + "entryPoints": ["web-4200"], + "routes": [build_route(http_port)], + "tls": {"passthrough": True}, + }, + } + await call_kubeapi( + custom_api.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + body=http_ingress_body, + ) + + +async def delete_traefik_resources( + namespace: str, + name: str, + logger: logging.Logger, +) -> None: + """ + Delete all Traefik resources owned by this CrateDB cluster. + + Deletes the MiddlewareTCP and both IngressRouteTCP resources. + + :param namespace: Kubernetes namespace where the resources reside. + :param name: Name of the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + # Delete MiddlewareTCP + try: + await custom_api.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewaretcps", + name=f"cratedb-allow-{name}", + ) + except ApiException as e: + if e.status != 404: + raise + # Delete IngressRouteTCP for PG + try: + await custom_api.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=f"crate-pg-{name}", + ) + except ApiException as e: + if e.status != 404: + raise + # Delete IngressRouteTCP for HTTP + try: + await custom_api.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=f"crate-http-{name}", + ) + except ApiException as e: + if e.status != 404: + raise + + +async def delete_service( + namespace: str, + name: str, + logger: logging.Logger, +) -> None: + """ + Delete the main data service (crate-). + + :param namespace: Kubernetes namespace where the service resides. + :param name: Name of the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + core = CoreV1Api(api_client) + try: + await core.delete_namespaced_service( + name=f"crate-{name}", + namespace=namespace, + ) + except ApiException as e: + if e.status != 404: + raise + + +async def delete_ingress_route_tcps( + namespace: str, name: str, logger: logging.Logger +) -> None: + """ + Delete only the IngressRouteTCP resources (keep middleware if any). + + This is used when updating CIDRs from empty to non‑empty, to recreate + the ingress routes with the new middleware reference. + + :param namespace: Kubernetes namespace where the resources reside. + :param name: Name of the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + for suffix in ["pg", "http"]: + ingress_name = f"crate-{suffix}-{name}" + try: + await custom_api.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=ingress_name, + ) + logger.info(f"Deleted IngressRouteTCP {ingress_name}") + except ApiException as e: + if e.status != 404: + raise + + +async def delete_middleware_tcp( + namespace: str, name: str, logger: logging.Logger +) -> None: + """ + Delete only the MiddlewareTCP resource. + + :param namespace: Kubernetes namespace where the resource resides. + :param name: Name of the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + middleware_name = f"cratedb-allow-{name}" + try: + await custom_api.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewaretcps", + name=middleware_name, + ) + logger.info(f"Deleted MiddlewareTCP {middleware_name}") + except ApiException as e: + if e.status != 404: + raise + + +async def _remove_middleware_from_ingress_routes( + namespace: str, name: str, logger: logging.Logger, custom_api: CustomObjectsApi +) -> None: + """ + Patch both IngressRouteTCP resources to remove the 'middlewares' field. + + This is called when allowedCIDRs becomes empty and a middleware existed, + to remove the reference from the ingress routes. + + :param namespace: Kubernetes namespace where the resources reside. + :param name: Name of the CrateDB cluster. + :param logger: Logger for operation tracking. + :param custom_api: Instance of CustomObjectsApi to perform the patch. + """ + ingress_names = [f"crate-pg-{name}", f"crate-http-{name}"] + for ingress_name in ingress_names: + try: + irt = await custom_api.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=ingress_name, + ) + # Remove middlewares from each route + routes = irt.get("spec", {}).get("routes", []) + modified = False + for route in routes: + if "middlewares" in route: + del route["middlewares"] + modified = True + if modified: + patch_body = [ + {"op": "replace", "path": "/spec/routes", "value": routes} + ] + await call_kubeapi( + custom_api.patch_namespaced_custom_object, + logger, + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=ingress_name, + body=patch_body, + _content_type="application/json-patch+json", + ) + logger.info( + f"Removed middleware reference from IngressRouteTCP {ingress_name}" + ) + except ApiException as e: + if e.status != 404: + raise + + +async def update_traefik_ip_restriction( + namespace: str, + name: str, + new_cidrs: List[str], + logger: logging.Logger, +) -> None: + """ + Create, patch, or delete Traefik MiddlewareTCP based on new CIDRs. + Also ensures IngressRouteTCPs reference the middleware if it exists. + + Handles four cases: + - non‑empty -> non‑empty: patch existing middleware. + - non‑empty -> empty: delete middleware and remove reference from ingress routes. + - empty -> non‑empty: create middleware and recreate ingress routes to reference it. + - empty -> empty: do nothing. + + :param namespace: Kubernetes namespace where the resources reside. + :param name: Name of the CrateDB cluster. + :param new_cidrs: New list of CIDR ranges (may be empty). + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + middleware_name = f"cratedb-allow-{name}" + need_middleware = bool(new_cidrs) + + middleware_exists = False + try: + await custom_api.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewaretcps", + name=middleware_name, + ) + middleware_exists = True + except ApiException as e: + if e.status != 404: + raise + + if need_middleware and not middleware_exists: + # Create middleware and recreate ingress routes to reference it + logger.info( + f"Creating MiddlewareTCP {middleware_name} with CIDRs {new_cidrs}" + ) + cratedb = await get_cratedb_resource(namespace, name) + spec = cratedb["spec"] + dns_record = spec.get("cluster", {}).get("externalDNS") + if not dns_record: + logger.error("Cannot create middleware: externalDNS missing") + return + ports_spec = spec.get("ports", {}) + http_port = ports_spec.get("http", Port.HTTP.value) + postgres_port = ports_spec.get("postgres", Port.POSTGRES.value) + owner_references = get_owner_references(name, cratedb["metadata"]) + + # Recreate both ingress routes (they will include the middleware) + await delete_ingress_route_tcps(namespace, name, logger) + await create_traefik_resources( + owner_references, + namespace, + name, + dns_record, + new_cidrs, + http_port, + postgres_port, + logger, + ) + elif need_middleware and middleware_exists: + # Patch existing middleware + logger.info( + f"Patching MiddlewareTCP {middleware_name} with new CIDRs {new_cidrs}" + ) + patch_body = {"spec": {"ipAllowList": {"sourceRange": new_cidrs}}} + await call_kubeapi( + custom_api.patch_namespaced_custom_object, + logger, + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewaretcps", + name=middleware_name, + body=patch_body, + ) + elif not need_middleware and middleware_exists: + # Delete middleware and remove reference from ingress routes + logger.info(f"Deleting MiddlewareTCP {middleware_name} (no CIDRs)") + await delete_middleware_tcp(namespace, name, logger) + await _remove_middleware_from_ingress_routes( + namespace, name, logger, custom_api + ) + else: + logger.info("No change: CIDRs empty and no middleware exists") + + +class CreateTraefikResourcesSubHandler(StateBasedSubHandler): + """ + Creates Traefik resources during cluster creation. + """ + + @crate.on.error(error_handler=crate.send_create_failed_notification) + async def handle( + self, + namespace: str, + name: str, + owner_references: Optional[List[V1OwnerReference]], + dns_record: Optional[str], + source_ranges: Optional[List[str]], + http_port: int, + postgres_port: int, + logger: logging.Logger, + **kwargs: Any, + ): + await create_traefik_resources( + owner_references, + namespace, + name, + dns_record, + source_ranges, + http_port, + postgres_port, + logger, + ) + + +class ChangeExposureSubHandler(StateBasedSubHandler): + """ + Handles changes to spec.cluster.exposure between 'loadbalancer' and 'traefik'. + Deletes old resources and creates new ones accordingly. + """ + + @crate.on.error(error_handler=crate.send_update_failed_notification) + async def handle( + self, + namespace: str, + name: str, + body: kopf.Body, + old: kopf.Body, + logger: logging.Logger, + **kwargs: Any, + ): + new_exposure = body["spec"]["cluster"].get("exposure", "loadbalancer") + old_exposure = old["spec"]["cluster"].get("exposure", "loadbalancer") + + if old_exposure == new_exposure: + logger.info(f"Exposure unchanged: {new_exposure}") + return + + logger.info(f"Changing exposure from {old_exposure} to {new_exposure}") + + if old_exposure == "loadbalancer": + await delete_service(namespace, name, logger) + elif old_exposure == "traefik": + await delete_traefik_resources(namespace, name, logger) + await delete_service(namespace, name, logger) + + # Recreate the main service with the new exposure type + spec = body["spec"] + ports_spec = spec.get("ports", {}) + http_port = ports_spec.get("http", Port.HTTP.value) + postgres_port = ports_spec.get("postgres", Port.POSTGRES.value) + transport_port = ports_spec.get("transport", Port.TRANSPORT.value) + dns_record = spec.get("cluster", {}).get("externalDNS") + source_ranges = spec["cluster"].get("allowedCIDRs", None) + additional_annotations = ( + spec.get("cluster", {}).get("service", {}).get("annotations", {}) + ) + + labels = build_cratedb_labels(name, body["metadata"]) + owner_references = get_owner_references(name, body["metadata"]) + + use_traefik = new_exposure == "traefik" + await create_services( + owner_references, + namespace, + name, + labels, + http_port, + postgres_port, + transport_port, + dns_record, + logger, + source_ranges, + additional_annotations, + use_traefik=use_traefik, + ) + + if new_exposure == "traefik": + await create_traefik_resources( + owner_references, + namespace, + name, + dns_record, + source_ranges, + http_port, + postgres_port, + logger, + ) diff --git a/crate/operator/handlers/handle_create_cratedb.py b/crate/operator/handlers/handle_create_cratedb.py index f44885da..238f7cdf 100644 --- a/crate/operator/handlers/handle_create_cratedb.py +++ b/crate/operator/handlers/handle_create_cratedb.py @@ -39,6 +39,7 @@ build_cratedb_labels, get_owner_references, ) +from crate.operator.exposure import CreateTraefikResourcesSubHandler from crate.operator.operations import get_master_nodes_names, get_total_nodes_count from crate.operator.utils.secrets import get_image_pull_secrets @@ -83,6 +84,8 @@ async def create_cratedb( cluster_name = spec["cluster"]["name"] source_ranges = spec["cluster"].get("allowedCIDRs", None) cloud_settings = spec.get("grandCentral", {}) + exposure = spec["cluster"].get("exposure", "loadbalancer") + dns_record = spec.get("cluster", {}).get("externalDNS") kopf.register( fn=CreateSqlExporterConfigSubHandler(namespace, name, hash, context)( owner_references=owner_references, cratedb_labels=cratedb_labels @@ -124,15 +127,28 @@ async def create_cratedb( http_port=http_port, postgres_port=postgres_port, transport_port=transport_port, - dns_record=spec.get("cluster", {}).get("externalDNS"), + dns_record=dns_record, source_ranges=source_ranges, additional_annotations=spec.get("cluster", {}) .get("service", {}) .get("annotations", {}), + exposure=exposure, ), id="services", ) + if exposure == "traefik": + kopf.register( + fn=CreateTraefikResourcesSubHandler(namespace, name, hash, context)( + owner_references=owner_references, + dns_record=dns_record, + source_ranges=source_ranges, + http_port=http_port, + postgres_port=postgres_port, + ), + id="traefik_resources", + ) + if has_master_nodes: kopf.register( fn=CreateStatefulsetSubHandler(namespace, name, hash, context)( diff --git a/crate/operator/handlers/handle_update_allowed_cidrs.py b/crate/operator/handlers/handle_update_allowed_cidrs.py index 39e8769a..04a0bbba 100644 --- a/crate/operator/handlers/handle_update_allowed_cidrs.py +++ b/crate/operator/handlers/handle_update_allowed_cidrs.py @@ -26,8 +26,10 @@ from kubernetes_asyncio.client import CoreV1Api, NetworkingV1Api from crate.operator.constants import GRAND_CENTRAL_RESOURCE_PREFIX +from crate.operator.exposure import update_traefik_ip_restriction from crate.operator.grand_central import read_grand_central_ingress from crate.operator.utils.k8s_api_client import GlobalApiClient +from crate.operator.utils.kubeapi import get_cratedb_resource from crate.operator.utils.notifications import send_operation_progress_notification from crate.operator.webhooks import WebhookAction, WebhookOperation, WebhookStatus @@ -39,7 +41,8 @@ async def update_service_allowed_cidrs( logger: logging.Logger, ): change: DiffItem = diff[0] - logger.info(f"Updating load balancer source ranges to {change.new}") + new_cidrs = change.new or [] + logger.info(f"Updating source ranges to {change.new}") await send_operation_progress_notification( namespace=namespace, @@ -54,21 +57,36 @@ async def update_service_allowed_cidrs( async with GlobalApiClient() as api_client: core = CoreV1Api(api_client) networking = NetworkingV1Api(api_client) + cratedb = await get_cratedb_resource(namespace, name) + exposure = ( + cratedb.get("spec", {}).get("cluster", {}).get("exposure", "loadbalancer") + ) + # This also runs on creation events, so we want to double check that the service # exists before attempting to do anything. + # Update the main data service if it's a LoadBalancer services = await core.list_namespaced_service(namespace=namespace) service = next( (svc for svc in services.items if svc.metadata.name == f"crate-{name}"), None, ) - if not service: - return - await core.patch_namespaced_service( - name=f"crate-{name}", - namespace=namespace, - body={"spec": {"loadBalancerSourceRanges": change.new}}, - ) + # Only patch loadBalancerSourceRanges if the service is of type LoadBalancer. + # For ClusterIP this field is forbidden and will cause a 422. + if service and service.spec.type == "LoadBalancer": + await core.patch_namespaced_service( + name=f"crate-{name}", + namespace=namespace, + body={"spec": {"loadBalancerSourceRanges": new_cidrs}}, + ) + else: + logger.info( + f"Skipping loadBalancerSourceRanges patch: service 'crate-{name}' " + f"is of type '{service.spec.type if service else 'missing'}'." + ) + + if exposure == "traefik": + await update_traefik_ip_restriction(namespace, name, new_cidrs, logger) ingress = await read_grand_central_ingress(namespace=namespace, name=name) @@ -80,7 +98,7 @@ async def update_service_allowed_cidrs( "metadata": { "annotations": { "nginx.ingress.kubernetes.io/whitelist-source-range": ",".join( # noqa - change.new + new_cidrs ) } } diff --git a/crate/operator/handlers/handle_update_cratedb.py b/crate/operator/handlers/handle_update_cratedb.py index 1b08a15c..f486763d 100644 --- a/crate/operator/handlers/handle_update_cratedb.py +++ b/crate/operator/handlers/handle_update_cratedb.py @@ -33,6 +33,7 @@ from crate.operator.config import config from crate.operator.constants import CLUSTER_UPDATE_ID, OperationType from crate.operator.expand_volume import ExpandVolumeSubHandler +from crate.operator.exposure import ChangeExposureSubHandler from crate.operator.operations import ( DELAY_CRONJOB, AfterClusterUpdateSubHandler, @@ -123,6 +124,7 @@ async def update_cratedb( do_restart = False do_scale = False do_expand_volume = False + exposure_changed = False operation = OperationType.UNKNOWN @@ -137,6 +139,14 @@ async def update_cratedb( elif field_path == ("spec", "nodes", "master", "replicas"): do_scale = True operation = OperationType.SCALE + elif field_path == ("spec", "cluster", "exposure"): + old_val = old_spec if old_spec is not None else "loadbalancer" + new_val = new_spec if new_spec is not None else "loadbalancer" + if old_val != new_val: + exposure_changed = True + do_before_update = False + do_after_update = False + operation = OperationType.CHANGE_EXPOSURE elif field_path == ("spec", "nodes", "data"): for node_spec_idx in range(len(old_spec)): old_spec = old_spec[node_spec_idx] @@ -180,6 +190,7 @@ async def update_cratedb( and not do_scale and not do_expand_volume and not do_change_compute + and not exposure_changed ): return @@ -243,6 +254,11 @@ async def update_cratedb( namespace, name, change_hash, context, depends_on ) + if exposure_changed: + register_exposure_change_handlers( + namespace, name, change_hash, context, depends_on + ) + if do_after_update: register_after_update_handlers( namespace, name, change_hash, context, depends_on, operation @@ -509,6 +525,19 @@ def register_before_update_handlers( depends_on.append(f"{CLUSTER_UPDATE_ID}/before_cluster_update") +def register_exposure_change_handlers( + namespace, name, change_hash, context, depends_on +): + kopf.register( + fn=ChangeExposureSubHandler( + namespace, name, change_hash, context, depends_on=depends_on.copy() + )(), + id="change_exposure", + backoff=get_backoff(), + ) + depends_on.append(f"{CLUSTER_UPDATE_ID}/change_exposure") + + def get_backoff() -> int: """ When in testing mode, use a shorter backoff period as it help with speeding up some diff --git a/crate/operator/operations.py b/crate/operator/operations.py index 33608ba6..94d25d3f 100644 --- a/crate/operator/operations.py +++ b/crate/operator/operations.py @@ -30,9 +30,11 @@ from aiopg import Cursor from kopf import TemporaryError from kubernetes_asyncio.client import ( + ApiException, AppsV1Api, BatchV1Api, CoreV1Api, + CustomObjectsApi, V1ConfigMap, V1CronJobList, V1JobList, @@ -59,6 +61,7 @@ LABEL_NAME, LABEL_NODE_NAME, LABEL_PART_OF, + Port, ) from crate.operator.cratedb import ( are_snapshots_in_progress, @@ -68,7 +71,12 @@ reset_cluster_setting, set_cluster_setting, ) -from crate.operator.create import recreate_services +from crate.operator.create import get_owner_references, recreate_services +from crate.operator.exposure import ( + create_traefik_resources, + delete_service as delete_clusterip_service, + delete_traefik_resources, +) from crate.operator.grand_central import read_grand_central_deployment from crate.operator.sql import execute_sql from crate.operator.utils import crate @@ -593,6 +601,86 @@ async def restart_cluster( ) +async def is_service_present(core: CoreV1Api, namespace: str, name: str) -> bool: + """ + Check if the main data service (crate-) exists. + + :param core: Kubernetes CoreV1Api client. + :param namespace: Namespace to check. + :param name: Name of the CrateDB cluster. + """ + svc_name = f"crate-{name}" + selector = f"metadata.name={svc_name}" + svc_list: V1ServiceList = await core.list_namespaced_service( + namespace=namespace, field_selector=selector + ) + return len(svc_list.items) >= 1 + + +async def are_traefik_resources_present(namespace: str, name: str) -> bool: + """ + Check if both IngressRouteTCP resources exist for this cluster. + + :param namespace: Namespace to check. + :param name: Name of the CrateDB cluster. + """ + async with GlobalApiClient() as api_client: + custom_api = CustomObjectsApi(api_client) + for suffix in ["pg", "http"]: + try: + await custom_api.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=f"crate-{suffix}-{name}", + ) + except ApiException as e: + if e.status == 404: + return False + raise + return True + + +async def _recreate_traefik_resources( + namespace: str, name: str, logger: logging.Logger +): + """ + Recreate Traefik resources after a resume operation. + + This is called when scaling a cluster back up from 0 replicas and the + exposure is 'traefik'. It reads the current spec and recreates the + MiddlewareTCP and IngressRouteTCP resources. + + :param namespace: Kubernetes namespace. + :param name: Name of the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + cratedb = await get_cratedb_resource(namespace, name) + spec = cratedb["spec"] + dns_record = spec.get("cluster", {}).get("externalDNS") + if not dns_record: + logger.warning("Cannot recreate Traefik resources: externalDNS missing") + return + + ports_spec = spec.get("ports", {}) + http_port = ports_spec.get("http", Port.HTTP.value) + postgres_port = ports_spec.get("postgres", Port.POSTGRES.value) + source_ranges = spec["cluster"].get("allowedCIDRs", None) + owner_references = get_owner_references(name, cratedb["metadata"]) + + await create_traefik_resources( + owner_references, + namespace, + name, + dns_record, + source_ranges, + http_port, + postgres_port, + logger, + ) + + async def suspend_or_start_cluster( apps: AppsV1Api, core: CoreV1Api, @@ -604,7 +692,7 @@ async def suspend_or_start_cluster( logger: logging.Logger, ): """ - Suspend or scale a cluster ``name`` back up, according to the given + Suspend or scale a cluster ``name`` back up, according to the given ``data_diff_items``. :param apps: An instance of the Kubernetes Apps V1 API. @@ -618,7 +706,11 @@ async def suspend_or_start_cluster( should be suspended/started as well. This is usually not the case for volume expansion operations. """ - spec = old["spec"] + cratedb = await get_cratedb_resource(namespace, name) + exposure = ( + cratedb.get("spec", {}).get("cluster", {}).get("exposure", "loadbalancer") + ) + use_traefik = exposure == "traefik" if data_diff_items: for _, field_path, old_replicas, new_replicas in data_diff_items: @@ -626,17 +718,23 @@ async def suspend_or_start_cluster( # scale the cluster back up # Check if service is present, re-create it if not - if not await is_lb_service_present(core, namespace, name): - cratedb = await get_cratedb_resource(namespace, name) + if not await is_service_present(core, namespace, name): await recreate_services( namespace, name, cratedb["spec"], cratedb["metadata"], logger ) - if not await is_lb_service_ready(core, namespace, name): + if not use_traefik and not await is_lb_service_ready( + core, namespace, name + ): raise TemporaryError(delay=config.BOOTSTRAP_RETRY_DELAY) + if use_traefik and not await are_traefik_resources_present( + namespace, name + ): + await _recreate_traefik_resources(namespace, name, logger) + index_path, *_ = field_path index = int(index_path) - node_spec = spec["nodes"]["data"][index] + node_spec = old["spec"]["nodes"]["data"][index] node_name = node_spec["name"] sts_name = f"crate-data-{node_name}-{name}" statefulset = await apps.read_namespaced_stateful_set( @@ -696,7 +794,7 @@ async def suspend_or_start_cluster( await check_cluster_healthy(name, namespace, apps, conn_factory, logger) index_path, *_ = field_path index = int(index_path) - node_spec = spec["nodes"]["data"][index] + node_spec = old["spec"]["nodes"]["data"][index] node_name = node_spec["name"] sts_name = f"crate-data-{node_name}-{name}" statefulset = await apps.read_namespaced_stateful_set( @@ -740,8 +838,15 @@ async def suspend_or_start_cluster( ) await check_all_data_nodes_gone(core, namespace, name, old) - # Try to delete the load balancing service if present - await delete_lb_service(core, namespace, name) + # Delete the service and Traefik resources (if any) when suspending + if use_traefik: + # Delete Traefik resources (IngressRouteTCPs and MiddlewareTCP) + await delete_traefik_resources(namespace, name, logger) + # Also delete the ClusterIP service + await delete_clusterip_service(namespace, name, logger) + else: + # Delete the LoadBalancer service + await delete_lb_service(core, namespace, name) async def suspend_or_start_grand_central( diff --git a/crate/operator/utils/kubeapi.py b/crate/operator/utils/kubeapi.py index 0ce2cca5..5aed09f0 100644 --- a/crate/operator/utils/kubeapi.py +++ b/crate/operator/utils/kubeapi.py @@ -197,9 +197,17 @@ async def get_host(core: CoreV1Api, namespace: str, name: str) -> str: :param name: The name of the CrateDB cluster. """ if config.TESTING: - # During testing we need to connect to the cluster via its public IP - # address, because the operator isn't running inside the Kubernetes - # cluster. + cratedb = await get_cratedb_resource(namespace, name) + exposure = ( + cratedb.get("spec", {}).get("cluster", {}).get("exposure", "loadbalancer") + ) + + if exposure == "traefik": + dns = cratedb.get("spec", {}).get("cluster", {}).get("externalDNS") + if dns: + return dns + raise kopf.TemporaryError("Waiting for externalDNS...", delay=5) + return await get_service_public_hostname(core, namespace, name) return f"crate-discovery-{name}.{namespace}" diff --git a/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml b/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml index 589f360f..29a75a6d 100644 --- a/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml +++ b/deploy/charts/crate-operator-crds/templates/cratedbs-cloud-crate-io.yaml @@ -560,6 +560,10 @@ spec: version: description: CrateDB version type: string + exposure: + description: Service exposure type + type: string + enum: ["loadbalancer", "traefik"] required: - imageRegistry - name diff --git a/deploy/charts/crate-operator/templates/rbac.yaml b/deploy/charts/crate-operator/templates/rbac.yaml index 2797d472..97761db8 100644 --- a/deploy/charts/crate-operator/templates/rbac.yaml +++ b/deploy/charts/crate-operator/templates/rbac.yaml @@ -84,6 +84,18 @@ rules: - patch - update - watch +- apiGroups: + - traefik.io + resources: + - middlewaretcps + - ingressroutetcps + verbs: + - create + - get + - list + - watch + - patch + - delete --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 405b67f0..74dfe42b 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -109,6 +109,18 @@ rules: - patch - update - watch +- apiGroups: + - traefik.io + resources: + - middlewaretcps + - ingressroutetcps + verbs: + - create + - get + - list + - watch + - patch + - delete --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/tests/test_change_compute.py b/tests/test_change_compute.py index a4075f2f..23632ca7 100644 --- a/tests/test_change_compute.py +++ b/tests/test_change_compute.py @@ -49,6 +49,7 @@ do_pods_exist, is_cluster_healthy, is_kopf_handler_finished, + require_connection, start_cluster, was_notification_sent, ) @@ -129,7 +130,7 @@ async def test_change_compute_from_request_to_limit( }, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await assert_wait_for( True, @@ -173,7 +174,7 @@ async def test_change_compute_from_request_to_limit( await assert_wait_for( True, cluster_routing_allocation_enable_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "new_primaries", err_msg="Cluster routing allocation setting has not been updated", timeout=DEFAULT_TIMEOUT * 5, @@ -224,7 +225,7 @@ async def test_change_compute_from_request_to_limit( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), 1, err_msg="Cluster wasn't healthy", timeout=DEFAULT_TIMEOUT, @@ -233,7 +234,7 @@ async def test_change_compute_from_request_to_limit( await assert_wait_for( True, cluster_routing_allocation_enable_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "all", err_msg="Cluster routing allocation setting has not been updated", timeout=DEFAULT_TIMEOUT * 5, diff --git a/tests/test_create_grand_central.py b/tests/test_create_grand_central.py index 190aa37a..ec9ea904 100644 --- a/tests/test_create_grand_central.py +++ b/tests/test_create_grand_central.py @@ -46,6 +46,7 @@ do_pods_exist, is_cluster_healthy, is_kopf_handler_finished, + require_connection, start_cluster, ) @@ -80,7 +81,7 @@ async def test_create_grand_central(faker, namespace, kopf_runner, api_client): }, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await assert_wait_for( True, diff --git a/tests/test_expand_volume.py b/tests/test_expand_volume.py index 6eccc0ea..ed89e88f 100644 --- a/tests/test_expand_volume.py +++ b/tests/test_expand_volume.py @@ -46,6 +46,7 @@ create_test_sys_jobs_table, is_cluster_healthy, is_kopf_handler_finished, + require_connection, start_cluster, was_notification_sent, ) @@ -106,13 +107,13 @@ async def test_expand_cluster_storage( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), number_of_nodes, err_msg="Cluster wasn't healthy after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await _expand_volume(coapi, name, namespace, "32Gi") @@ -150,7 +151,7 @@ async def test_expand_cluster_storage( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), number_of_nodes, err_msg="Cluster wasn't back up again after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, diff --git a/tests/test_exposure.py b/tests/test_exposure.py new file mode 100644 index 00000000..49931a12 --- /dev/null +++ b/tests/test_exposure.py @@ -0,0 +1,735 @@ +# CrateDB Kubernetes Operator +# +# Licensed to Crate.IO GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +from typing import Optional +from unittest import mock + +import pytest +from kubernetes_asyncio.client import ApiException, CoreV1Api, CustomObjectsApi + +from crate.operator.constants import ( + API_GROUP, + KOPF_STATE_STORE_PREFIX, + RESOURCE_CRATEDB, +) + +from .utils import ( + DEFAULT_TIMEOUT, + assert_wait_for, + is_kopf_handler_finished, + start_cluster, + wait_for_kopf_handler, +) + +pytestmark = [pytest.mark.k8s, pytest.mark.asyncio] + + +async def middleware_exists( + custom_api: CustomObjectsApi, namespace: str, name: str +) -> bool: + try: + await custom_api.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewaretcps", + name=f"cratedb-allow-{name}", + ) + return True + except ApiException as e: + if e.status == 404: + return False + raise + + +async def ingress_route_exists( + custom_api: CustomObjectsApi, namespace: str, name: str, suffix: str +) -> bool: + try: + await custom_api.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=f"crate-{suffix}-{name}", + ) + return True + except ApiException as e: + if e.status == 404: + return False + raise + + +async def ingress_route_has_middleware( + custom_api: CustomObjectsApi, namespace: str, name: str, suffix: str +) -> bool: + try: + irt = await custom_api.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="ingressroutetcps", + name=f"crate-{suffix}-{name}", + ) + routes = irt.get("spec", {}).get("routes", []) + for route in routes: + if "middlewares" in route: + return True + return False + except ApiException as e: + if e.status == 404: + return False + raise + + +async def get_service(core: CoreV1Api, namespace: str, name: str): + try: + return await core.read_namespaced_service(f"crate-{name}", namespace) + except ApiException as e: + if e.status == 404: + return None + raise + + +async def service_type(core: CoreV1Api, namespace: str, name: str) -> Optional[str]: + svc = await get_service(core, namespace, name) + return svc.spec.type if svc else None + + +async def service_is_clusterip(core: CoreV1Api, namespace: str, name: str) -> bool: + svc = await get_service(core, namespace, name) + return svc is not None and svc.spec.type == "ClusterIP" + + +async def service_is_loadbalancer(core: CoreV1Api, namespace: str, name: str) -> bool: + svc = await get_service(core, namespace, name) + return svc is not None and svc.spec.type == "LoadBalancer" + + +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_create_traefik_with_cidrs( + mock_send_notification: mock.AsyncMock, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Test that a Traefik‑exposed cluster with non‑empty allowedCIDRs creates a + ClusterIP service, a MiddlewareTCP, and IngressRouteTCPs that reference the + middleware. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + cidrs = ["192.168.1.0/24", "10.0.0.0/8"] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": cidrs, + }, + ) + + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + # Service should be ClusterIP + await assert_wait_for( + True, + service_is_clusterip, + core, + namespace.metadata.name, + name, + err_msg="Service was not ClusterIP", + timeout=DEFAULT_TIMEOUT, + ) + + # Middleware should exist + await assert_wait_for( + True, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # IngressRoutes should exist and have middleware reference + for suffix in ("pg", "http"): + await assert_wait_for( + True, + ingress_route_exists, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + ingress_route_has_middleware, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute does not have middleware reference", + timeout=DEFAULT_TIMEOUT, + ) + + +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_create_traefik_without_cidrs( + mock_send_notification: mock.AsyncMock, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Test that a Traefik‑exposed cluster with empty allowedCIDRs creates a + ClusterIP service, no MiddlewareTCP, and IngressRouteTCPs without a + middleware reference. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": [], # empty list + }, + ) + + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + # Service ClusterIP + await assert_wait_for( + True, + service_is_clusterip, + core, + namespace.metadata.name, + name, + err_msg="Service was not ClusterIP", + timeout=DEFAULT_TIMEOUT, + ) + + # No middleware + await assert_wait_for( + False, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was created", + timeout=DEFAULT_TIMEOUT, + ) + + # IngressRoutes exist but have no middleware + for suffix in ("pg", "http"): + await assert_wait_for( + True, + ingress_route_exists, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + False, + ingress_route_has_middleware, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute has middleware reference", + timeout=DEFAULT_TIMEOUT, + ) + + +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_update_cidrs_traefik_empty_to_nonempty( + mock_send_notification: mock.AsyncMock, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Test that updating allowedCIDRs from empty to non‑empty on a Traefik + cluster creates the missing MiddlewareTCP and adds the middleware + reference to the IngressRouteTCPs. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + initial_cidrs: list[str] = [] + updated_cidrs: list[str] = ["192.168.1.0/24"] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": initial_cidrs, + }, + ) + + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + # Service should be ClusterIP + await assert_wait_for( + True, + service_is_clusterip, + core, + namespace.metadata.name, + name, + err_msg="Service was not ClusterIP", + timeout=DEFAULT_TIMEOUT, + ) + + # Initially no middleware + await assert_wait_for( + False, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was created", + timeout=DEFAULT_TIMEOUT, + ) + + # Update CIDRs + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + { + "op": "replace", + "path": "/spec/cluster/allowedCIDRs", + "value": updated_cidrs, + } + ], + ) + + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/service_cidr_changes/spec.cluster.allowedCIDRs", + timeout=DEFAULT_TIMEOUT, + ) + + # Middleware should be created + await assert_wait_for( + True, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # IngressRoutes should now have middleware reference + for suffix in ("pg", "http"): + await assert_wait_for( + True, + ingress_route_has_middleware, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute does not have middleware reference", + timeout=DEFAULT_TIMEOUT, + ) + + +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_update_cidrs_traefik_nonempty_to_empty( + mock_send_notification: mock.AsyncMock, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Test that updating allowedCIDRs from non‑empty to empty on a Traefik + cluster deletes the MiddlewareTCP and removes the middleware reference + from the IngressRouteTCPs. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + initial_cidrs: list[str] = ["192.168.1.0/24"] + updated_cidrs: list[str] = [] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": initial_cidrs, + }, + ) + + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create.traefik_resources", + timeout=DEFAULT_TIMEOUT, + ) + + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + # Initially middleware exists + await assert_wait_for( + True, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # Update CIDRs to empty + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + { + "op": "replace", + "path": "/spec/cluster/allowedCIDRs", + "value": updated_cidrs, + } + ], + ) + + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/service_cidr_changes/spec.cluster.allowedCIDRs", + timeout=DEFAULT_TIMEOUT, + ) + + # Middleware should be deleted + await assert_wait_for( + False, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was not deleted", + timeout=DEFAULT_TIMEOUT, + ) + + # IngressRoutes should have no middleware reference + for suffix in ("pg", "http"): + await assert_wait_for( + False, + ingress_route_has_middleware, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute still has middleware reference", + timeout=DEFAULT_TIMEOUT, + ) + + +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_change_exposure_loadbalancer_to_traefik( + mock_send_notification: mock.AsyncMock, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Test changing exposure from loadbalancer to traefik: the service becomes + ClusterIP, and Traefik resources (middleware + ingress routes) are created. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + cidrs = ["10.0.0.0/8"] + + # Create with default exposure (loadbalancer) + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + additional_cluster_spec={ + "externalDNS": f"{name}.example.com", + "allowedCIDRs": cidrs, + }, + ) + + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + # Initially service is LoadBalancer, no Traefik resources + await assert_wait_for( + True, + service_is_loadbalancer, + core, + namespace.metadata.name, + name, + err_msg="Service was not LoadBalancer", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + False, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was created", + timeout=DEFAULT_TIMEOUT, + ) + + # Change exposure to traefik + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + { + "op": "replace", + "path": "/spec/cluster/exposure", + "value": "traefik", + } + ], + ) + + # Now service should be ClusterIP, middleware and ingress routes exist + await assert_wait_for( + True, + service_is_clusterip, + core, + namespace.metadata.name, + name, + err_msg="Service was not ClusterIP", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + for suffix in ("pg", "http"): + await assert_wait_for( + True, + ingress_route_exists, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + + +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_change_exposure_traefik_to_loadbalancer( + mock_send_notification: mock.AsyncMock, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Test changing exposure from traefik to loadbalancer: the service becomes + LoadBalancer, and all Traefik resources (middleware + ingress routes) are deleted. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + }, + ) + + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + # Initially Traefik resources exist + await assert_wait_for( + True, + service_is_clusterip, + core, + namespace.metadata.name, + name, + err_msg="Service was not ClusterIP", + timeout=DEFAULT_TIMEOUT, + ) + for suffix in ("pg", "http"): + await assert_wait_for( + True, + ingress_route_exists, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # Change exposure to loadbalancer + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + { + "op": "replace", + "path": "/spec/cluster/exposure", + "value": "loadbalancer", + } + ], + ) + + # Service becomes LoadBalancer, Traefik resources deleted + await assert_wait_for( + False, + middleware_exists, + coapi, + namespace.metadata.name, + name, + err_msg="Middleware was not deleted", + timeout=DEFAULT_TIMEOUT, + ) + for suffix in ("pg", "http"): + await assert_wait_for( + False, + ingress_route_exists, + coapi, + namespace.metadata.name, + name, + suffix, + err_msg="IngressRoute was not deleted", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + service_is_loadbalancer, + core, + namespace.metadata.name, + name, + err_msg="Service was not LoadBalancer", + timeout=DEFAULT_TIMEOUT, + ) diff --git a/tests/test_load_balancer_updates.py b/tests/test_load_balancer_updates.py index 5e9adfdc..241768d2 100644 --- a/tests/test_load_balancer_updates.py +++ b/tests/test_load_balancer_updates.py @@ -52,6 +52,7 @@ async def test_get_external_ip( name = faker.domain_word() ip, _ = await start_cluster(name, namespace, core, coapi, 1, wait_for_healthy=False) + assert ip is not None, "ip is None" await assert_wait_for( True, diff --git a/tests/test_restore_backup.py b/tests/test_restore_backup.py index a096991c..99f12312 100644 --- a/tests/test_restore_backup.py +++ b/tests/test_restore_backup.py @@ -72,6 +72,7 @@ is_cronjob_enabled, is_kopf_handler_finished, mocked_coro_func_called_with, + require_connection, start_backup_metrics, start_cluster, was_notification_sent, @@ -189,7 +190,7 @@ async def test_restore_backup_aws( grand_central_spec=(grand_central_spec if gc_enabled else None), ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await start_backup_metrics(name, namespace, faker) @@ -197,7 +198,7 @@ async def test_restore_backup_aws( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), number_of_nodes, err_msg="Cluster wasn't healthy after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, @@ -231,7 +232,7 @@ async def test_restore_backup_aws( await assert_wait_for( True, cluster_setting_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "indices.recovery.max_bytes_per_sec", RESTORE_MAX_BYTES_PER_SEC, err_msg="Cluster setting `max_bytes_per_sec` has not been updated.", @@ -240,7 +241,7 @@ async def test_restore_backup_aws( await assert_wait_for( True, cluster_setting_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "cluster.routing.allocation.cluster_concurrent_rebalance", RESTORE_CLUSTER_CONCURRENT_REBALANCE, err_msg="Cluster setting `cluster_concurrent_rebalance` has not been updated.", @@ -475,7 +476,7 @@ async def test_restore_backup_azure_blob( grand_central_spec=(grand_central_spec if gc_enabled else None), ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await start_backup_metrics(name, namespace, faker) @@ -483,7 +484,7 @@ async def test_restore_backup_azure_blob( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), number_of_nodes, err_msg="Cluster wasn't healthy after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, @@ -522,7 +523,7 @@ async def test_restore_backup_azure_blob( await assert_wait_for( True, cluster_setting_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "indices.recovery.max_bytes_per_sec", RESTORE_MAX_BYTES_PER_SEC, err_msg="Cluster setting `max_bytes_per_sec` has not been updated.", @@ -531,7 +532,7 @@ async def test_restore_backup_azure_blob( await assert_wait_for( True, cluster_setting_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "cluster.routing.allocation.cluster_concurrent_rebalance", RESTORE_CLUSTER_CONCURRENT_REBALANCE, err_msg="Cluster setting `cluster_concurrent_rebalance` has not been updated.", @@ -745,7 +746,7 @@ async def test_restore_backup_create_repo_fails( number_of_nodes, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await start_backup_metrics(name, namespace, faker) @@ -753,7 +754,7 @@ async def test_restore_backup_create_repo_fails( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), number_of_nodes, err_msg="Cluster wasn't healthy after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, diff --git a/tests/test_scale.py b/tests/test_scale.py index 0ab0e81b..8383db3e 100644 --- a/tests/test_scale.py +++ b/tests/test_scale.py @@ -60,6 +60,7 @@ is_cluster_healthy, is_cronjob_enabled, is_kopf_handler_finished, + require_connection, start_backup_metrics, start_cluster, was_notification_sent, @@ -245,7 +246,7 @@ async def test_scale_cluster( repl_hot_from, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await _scale_cluster(coapi, name, namespace, repl_hot_to) @@ -253,7 +254,7 @@ async def test_scale_cluster( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), repl_hot_to, err_msg="Cluster wasn't healthy after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, @@ -317,7 +318,7 @@ async def test_suspend_resume_cluster( }, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await start_backup_metrics(name, namespace, faker) @@ -410,7 +411,7 @@ async def test_suspend_resume_cluster( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), 1, err_msg="Cluster wasn't healthy after 5 minutes.", timeout=DEFAULT_TIMEOUT * 5, @@ -471,7 +472,7 @@ async def test_scale_cluster_while_create_snapshot_running( host, password = await start_cluster(name, namespace, core, coapi, 1) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await insert_test_snapshot_job(conn_factory) @@ -504,7 +505,7 @@ async def test_scale_cluster_while_create_snapshot_running( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), 2, err_msg="Cluster wasn't healthy after 2 minutes.", timeout=DEFAULT_TIMEOUT * 2, @@ -543,7 +544,7 @@ async def test_scale_cluster_while_k8s_snapshot_job_running( host, password = await start_cluster(name, namespace, core, coapi, 1) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await create_test_sys_jobs_table(conn_factory) await create_fake_snapshot_job(api_client, name, namespace.metadata.name) @@ -567,7 +568,7 @@ async def test_scale_cluster_while_k8s_snapshot_job_running( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), 2, err_msg="Cluster wasn't healthy after 3 minutes.", timeout=DEFAULT_TIMEOUT * 3, diff --git a/tests/test_update_backups_schedule.py b/tests/test_update_backups_schedule.py index b3ff9899..84f941ce 100644 --- a/tests/test_update_backups_schedule.py +++ b/tests/test_update_backups_schedule.py @@ -13,6 +13,7 @@ assert_wait_for, is_cluster_healthy, is_cronjob_schedule_matching, + require_connection, start_cluster, was_notification_sent, ) @@ -75,7 +76,7 @@ async def test_update_backups_schedule( host, password = await start_cluster( name, namespace, core, coapi, 1, backups_spec=backups_spec ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await assert_wait_for( True, diff --git a/tests/test_upgrade.py b/tests/test_upgrade.py index 9027507e..ff3612b1 100644 --- a/tests/test_upgrade.py +++ b/tests/test_upgrade.py @@ -60,6 +60,7 @@ do_pods_exist, is_cluster_healthy, is_kopf_handler_finished, + require_connection, start_cluster, was_notification_sent, ) @@ -91,7 +92,7 @@ async def test_upgrade_cluster( }, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await assert_wait_for( True, @@ -124,7 +125,7 @@ async def test_upgrade_cluster( await assert_wait_for( True, cluster_routing_allocation_enable_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "new_primaries", err_msg="Cluster routing allocation setting has not been updated", timeout=DEFAULT_TIMEOUT * 5, @@ -164,7 +165,7 @@ async def test_upgrade_cluster( await assert_wait_for( True, is_cluster_healthy, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), 3, err_msg="Cluster wasn't healthy", timeout=DEFAULT_TIMEOUT, @@ -173,7 +174,7 @@ async def test_upgrade_cluster( await assert_wait_for( True, cluster_routing_allocation_enable_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "all", err_msg="Cluster routing allocation setting has not been updated", timeout=DEFAULT_TIMEOUT * 5, @@ -357,7 +358,7 @@ async def test_upgrade_rollback_on_failure( }, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await assert_wait_for( True, @@ -388,7 +389,7 @@ async def test_upgrade_rollback_on_failure( await assert_wait_for( True, cluster_routing_allocation_enable_equals, - connection_factory(host, password), + connection_factory(*require_connection(host, password)), "new_primaries", err_msg="Cluster routing allocation setting has not been updated", timeout=DEFAULT_TIMEOUT * 5, @@ -465,7 +466,7 @@ async def test_upgrade_blocked_if_rollback_annotation_set( }, ) - conn_factory = connection_factory(host, password) + conn_factory = connection_factory(*require_connection(host, password)) await assert_wait_for( True, diff --git a/tests/utils.py b/tests/utils.py index f9a7cf66..8cbb9d28 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,6 +20,7 @@ # software solely pursuant to the terms of the relevant commercial agreement. import asyncio +import json import logging import os from functools import reduce @@ -107,13 +108,14 @@ async def start_cluster( hot_nodes: int = 0, crate_version: str = CRATE_VERSION, wait_for_healthy: bool = True, + wait_for_lb: bool = True, additional_cluster_spec: Optional[Mapping[str, Any]] = None, users: Optional[List[Mapping[str, Any]]] = None, resource_requests: Optional[Mapping[str, Any]] = None, backups_spec: Optional[Mapping[str, Any]] = None, grand_central_spec: Optional[Mapping[str, Any]] = None, -) -> Tuple[str, str]: - additional_cluster_spec = additional_cluster_spec if additional_cluster_spec else {} +) -> Tuple[Optional[str], Optional[str]]: + additional_cluster_spec = additional_cluster_spec or {} body: dict = { "apiVersion": "cloud.crate.io/v1", "kind": "CrateDB", @@ -189,26 +191,32 @@ async def start_cluster( body=body, ) - await assert_wait_for( - True, - is_lb_service_ready, - core, - namespace.metadata.name, - f"crate-{name}", - err_msg="Lb service was not ready.", - timeout=DEFAULT_TIMEOUT * 5, - ) + if wait_for_healthy and not wait_for_lb: + raise ValueError("wait_for_healthy=True requires wait_for_lb=True") - host = await asyncio.wait_for( - get_service_public_hostname(core, namespace.metadata.name, name), - # It takes a while to retrieve an external IP on AKS. - timeout=DEFAULT_TIMEOUT * 5, - ) - password = await get_system_user_password(core, namespace.metadata.name, name) + if wait_for_lb: + await assert_wait_for( + True, + is_lb_service_ready, + core, + namespace.metadata.name, + f"crate-{name}", + err_msg="Lb service was not ready.", + timeout=DEFAULT_TIMEOUT * 5, + ) + + host = await asyncio.wait_for( + get_service_public_hostname(core, namespace.metadata.name, name), + timeout=DEFAULT_TIMEOUT * 5, + ) + password = await get_system_user_password(core, namespace.metadata.name, name) + else: + host, password = None, None if wait_for_healthy: # The timeouts are pretty high here since in Azure it's sometimes # non-deterministic how long provisioning a pod will actually take. + assert host is not None and password is not None await assert_wait_for( True, is_kopf_handler_finished, @@ -315,6 +323,72 @@ async def is_kopf_handler_finished( return handler_status is None +async def _check_kopf_handler_status( + coapi: CustomObjectsApi, name, namespace: str, handler_name: str +): + """ + Returns True if handler succeeded, False if still running, None if not yet seen. + Raises AssertionError if handler failed permanently. + """ + cratedb = await coapi.get_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace, + name=name, + ) + annotations = cratedb["metadata"].get("annotations", {}) + raw = annotations.get(handler_name) + + if raw is None: + return None + + try: + status = json.loads(raw) + except (json.JSONDecodeError, TypeError): + logger.warning( + "Handler '%s' annotation is not valid JSON: %s", handler_name, raw + ) + return None + + if status.get("failure"): + raise AssertionError( + f"Handler '{handler_name}' failed: {status.get('message')}" + ) + + result = status.get("success", False) is True + return result + + +async def wait_for_kopf_handler( + coapi: CustomObjectsApi, + name: str, + namespace: str, + handler_name: str, + timeout: float = DEFAULT_TIMEOUT, + delay: float = 2, +): + """ + Wait for a kopf handler to complete, handling kopf's annotation cleanup lifecycle. + """ + deadline = asyncio.get_running_loop().time() + timeout + seen = False + + while asyncio.get_running_loop().time() < deadline: + result = await _check_kopf_handler_status(coapi, name, namespace, handler_name) + if result is True: + logger.info("Handler '%s' finished (success=True)", handler_name) + return + if result is None and seen: + logger.info("Handler '%s' finished (annotation cleaned up)", handler_name) + return + if result is not None: + seen = True + await asyncio.sleep(delay) + + raise AssertionError(f"Handler '{handler_name}' did not finish within {timeout}s") + + async def create_test_sys_jobs_table(conn_factory): async with conn_factory() as conn: async with conn.cursor() as cursor: @@ -539,3 +613,9 @@ async def is_lb_service_ready(core: CoreV1Api, namespace: str, expected: str) -> return True else: return False + + +def require_connection(host: Optional[str], password: Optional[str]) -> tuple[str, str]: + assert host is not None, "host is None" + assert password is not None, "password is None" + return host, password