From f6495ec2255f27856f3b58d1971574148e5b8efd Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 25 Nov 2025 17:45:49 +0000 Subject: [PATCH] orchestratord test: Introduce workflow for downtime measurement --- ci/nightly/pipeline.template.yml | 31 +- ci/plugins/mzcompose/hooks/command | 2 +- test/orchestratord/mzcompose.py | 528 +++++++++++++++++------------ 3 files changed, 336 insertions(+), 225 deletions(-) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index a411ec9d6692e..98267f4222c88 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -2389,20 +2389,33 @@ steps: key: orchestratord-test steps: - id: orchestratord-defaults - label: "Orchestratord test (defaults from documentation)" + label: "Orchestratord + documentation defaults" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 plugins: - ./ci/plugins/mzcompose: composition: orchestratord - run: defaults + run: documentation-defaults + ci-builder: stable + agents: + queue: hetzner-aarch64-16cpu-32gb + + - id: orchestratord-rolling-upgrade-downtime + label: "Orchestratord + rolling upgrade downtime" + artifact_paths: ["mz_debug_*.zip"] + depends_on: devel-docker-tags + timeout_in_minutes: 120 + plugins: + - ./ci/plugins/mzcompose: + composition: orchestratord + run: upgrade-downtime ci-builder: stable agents: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-default-properties - label: "Orchestratord test (defaults for properties)" + label: "Orchestratord + defaults for properties" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 @@ -2415,7 +2428,7 @@ steps: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-individual - label: "Orchestratord test (individual properties)" + label: "Orchestratord + individual properties" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 @@ -2428,7 +2441,7 @@ steps: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-combine - label: "Orchestratord test (combine properties)" + label: "Orchestratord + combine properties" artifact_paths: ["mz_debug_*.zip"] depends_on: build-aarch64 timeout_in_minutes: 120 @@ -2441,7 +2454,7 @@ steps: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-individual - label: "Orchestratord test (upgrade, individual props)" + label: "Orchestratord + upgrade, individual props" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 @@ -2454,7 +2467,7 @@ steps: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-combine - label: "Orchestratord test (upgrade, combine props)" + label: "Orchestratord + upgrade, combine props" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 @@ -2467,7 +2480,7 @@ steps: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-chain-individual - label: "Orchestratord test (upgrade chain, individual props)" + label: "Orchestratord + upgrade chain, individual props" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 @@ -2480,7 +2493,7 @@ steps: queue: hetzner-aarch64-16cpu-32gb - id: orchestratord-upgrade-chain-combine - label: "Orchestratord test (upgrade chain, combine props)" + label: "Orchestratord + upgrade chain, combine props" artifact_paths: ["mz_debug_*.zip"] depends_on: devel-docker-tags timeout_in_minutes: 120 diff --git a/ci/plugins/mzcompose/hooks/command b/ci/plugins/mzcompose/hooks/command index 766630a94d995..a3f107f9df749 100644 --- a/ci/plugins/mzcompose/hooks/command +++ b/ci/plugins/mzcompose/hooks/command @@ -344,7 +344,7 @@ cleanup() { && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Canary Environment" ] \ && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Benchmarking Staging Environment" ] \ && [[ ! "$BUILDKITE_LABEL" =~ Terraform\ .* ]] \ - && [[ ! "$BUILDKITE_LABEL" =~ Orchestratord\ test\ .* ]] \ + && [[ ! "$BUILDKITE_LABEL" =~ Orchestratord\ .* ]] \ && [[ ! "$BUILDKITE_LABEL" =~ Cluster\ spec\ sheet.* ]]; then echo "+++ services.log is empty, failing" exit 1 diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index 43b2715814d36..c29e0f219d3d7 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -26,6 +26,7 @@ from enum import Enum from typing import Any +import psycopg import requests import yaml from semver.version import Version @@ -43,7 +44,7 @@ from materialize.mzcompose.services.mz_debug import MzDebug from materialize.mzcompose.services.orchestratord import Orchestratord from materialize.mzcompose.services.testdrive import Testdrive -from materialize.util import all_subclasses +from materialize.util import PropagatingThread, all_subclasses from materialize.version_list import ( get_all_self_managed_versions, get_self_managed_versions, @@ -1258,7 +1259,9 @@ class Action(Enum): UpgradeChain = "upgrade-chain" -def workflow_defaults(c: Composition, parser: WorkflowArgumentParser) -> None: +def workflow_documentation_defaults( + c: Composition, parser: WorkflowArgumentParser +) -> None: parser.add_argument( "--tag", type=str, @@ -1591,6 +1594,97 @@ def make_mod_source( raise ValueError(f"Unhandled properties: {properties}") +def workflow_upgrade_downtime(c: Composition, parser: WorkflowArgumentParser) -> None: + parser.add_argument( + "--recreate-cluster", + action=argparse.BooleanOptionalAction, + help="Recreate cluster if it exists already", + ) + parser.add_argument( + "--tag", + type=str, + help="Custom version tag to use", + ) + parser.add_argument( + "--orchestratord-override", + default=True, + action=argparse.BooleanOptionalAction, + help="Override orchestratord tag", + ) + args = parser.parse_args() + + running = True + + def measure_downtime() -> None: + port_forward_process = None + connect_port_forward = True + try: + start_time = time.time() + while running: + if connect_port_forward: + if port_forward_process: + os.killpg(os.getpgid(port_forward_process.pid), signal.SIGTERM) + port_forward_process = None + balancerd_name = spawn.capture( + [ + "kubectl", + "get", + "pods", + "-l", + "app=balancerd", + "-n", + "materialize-environment", + "-o", + "jsonpath={.items[0].metadata.name}", + ] + ).strip() + port_forward_process = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"pod/{balancerd_name}", + "-n", + "materialize-environment", + "6875:6875", + ], + preexec_fn=os.setpgrp, + ) + connect_port_forward = False + try: + with psycopg.connect( + "postgres://materialize@127.0.0.1:6875/materialize", + autocommit=True, + ) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + except psycopg.OperationalError: + connect_port_forward = True + continue + runtime = time.time() - start_time + if runtime > 2: + print(f"Downtime: {runtime}s") + assert runtime < 15, f"SELECT 1 took more than 15s: {runtime}s" + time.sleep(10) + start_time = time.time() + finally: + if port_forward_process: + os.killpg(os.getpgid(port_forward_process.pid), signal.SIGTERM) + + definition = setup(c, args) + init(definition) + run(definition, False) + thread = PropagatingThread(target=measure_downtime) + thread.start() + time.sleep(10) # some time to make sure the thread runs fine + request = str(uuid.uuid4()) + definition["materialize"]["spec"]["requestRollout"] = request + definition["materialize"]["spec"]["forceRollout"] = request + run(definition, False) + time.sleep(120) # some time to make sure there is no downtime later + running = False + thread.join() + + def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "--recreate-cluster", @@ -1625,76 +1719,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: print(f"--- Random seed is {args.seed}") - kind_version = Version.parse(spawn.capture(["kind", "version"]).split(" ")[1][1:]) - assert kind_version >= Version.parse( - "0.29.0" - ), f"kind >= v0.29.0 required, while you are on {kind_version}" - - c.up(Service("testdrive", idle=True), Service("mz-debug", idle=True)) - c.invoke("cp", "mz-debug:/usr/local/bin/mz-debug", ".") - - cluster = "kind" - clusters = spawn.capture(["kind", "get", "clusters"]).strip().split("\n") - if cluster not in clusters or args.recreate_cluster: - setup(cluster) - - if not args.tag: - services = [ - "orchestratord", - "environmentd", - "clusterd", - "balancerd", - ] - c.up(*[Service(service, idle=True) for service in services]) - for service in services: - spawn.runv( - [ - "docker", - "tag", - c.compose["services"][service]["image"], - get_image(c.compose["services"][service]["image"], None), - ] - ) - spawn.runv( - ["kind", "load", "docker-image", "--name", cluster] - + [ - get_image(c.compose["services"][service]["image"], None) - for service in services - ] - ) - - definition: dict[str, Any] = {} - - with open(MZ_ROOT / "misc" / "helm-charts" / "operator" / "values.yaml") as f: - definition["operator"] = yaml.load(f, Loader=yaml.Loader) - with open(MZ_ROOT / "misc" / "helm-charts" / "testing" / "materialize.yaml") as f: - materialize_setup = list(yaml.load_all(f, Loader=yaml.Loader)) - assert len(materialize_setup) == 3 - definition["namespace"] = materialize_setup[0] - definition["secret"] = materialize_setup[1] - definition["materialize"] = materialize_setup[2] - - if args.orchestratord_override: - definition["operator"]["operator"]["image"]["tag"] = get_tag(args.tag) - # TODO: database-issues#9696, makes environmentd -> clusterd connections fail - # definition["operator"]["networkPolicies"]["enabled"] = True - # definition["operator"]["networkPolicies"]["internal"]["enabled"] = True - # definition["operator"]["networkPolicies"]["egress"]["enabled"] = True - # definition["operator"]["networkPolicies"]["ingress"]["enabled"] = True - # TODO: Remove when fixed: error: unexpected argument '--disable-license-key-checks' found - definition["operator"]["operator"]["args"]["enableLicenseKeyChecks"] = True - definition["operator"]["clusterd"]["nodeSelector"][ - "workload" - ] = "materialize-instance" - definition["operator"]["environmentd"]["nodeSelector"][ - "workload" - ] = "materialize-instance" - definition["secret"]["stringData"]["license_key"] = os.environ["MZ_CI_LICENSE_KEY"] - definition["materialize"]["spec"]["environmentdImageRef"] = get_image( - c.compose["services"]["environmentd"]["image"], args.tag - ) - # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json - # more than one address + definition = setup(c, args) rng = random.Random(args.seed) @@ -1775,160 +1800,233 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: pass -def setup(cluster: str): - spawn.runv(["kind", "delete", "cluster", "--name", cluster]) +def setup(c: Composition, args) -> dict[str, Any]: + c.up(Service("testdrive", idle=True), Service("mz-debug", idle=True)) + c.invoke("cp", "mz-debug:/usr/local/bin/mz-debug", ".") - try: - spawn.runv(["docker", "network", "create", "kind"]) - except: - pass - try: - spawn.runv(["docker", "stop", "proxy-dockerhub"]) - except: - pass - try: - spawn.runv(["docker", "rm", "proxy-dockerhub"]) - except: - pass - try: - spawn.runv(["docker", "stop", "proxy-ghcr"]) - except: - pass - try: - spawn.runv(["docker", "rm", "proxy-ghcr"]) - except: - pass + cluster = "kind" + clusters = spawn.capture(["kind", "get", "clusters"]).strip().split("\n") + if cluster not in clusters or args.recreate_cluster: + kind_version = Version.parse( + spawn.capture(["kind", "version"]).split(" ")[1][1:] + ) + assert kind_version >= Version.parse( + "0.29.0" + ), f"kind >= v0.29.0 required, while you are on {kind_version}" - dockerhub_username = os.getenv("DOCKERHUB_USERNAME") - dockerhub_token = os.getenv("DOCKERHUB_ACCESS_TOKEN") - spawn.runv( - [ - "docker", - "run", - "-d", - "--name", - "proxy-dockerhub", - "--restart=always", - "--net=kind", - "-v", - f"{MZ_ROOT}/misc/kind/cache/dockerhub:/var/lib/registry", - "-e", - "REGISTRY_PROXY_REMOTEURL=https://registry-1.docker.io", - *( - [ - "-e", - f"REGISTRY_PROXY_USERNAME={dockerhub_username}", - "-e", - f"REGISTRY_PROXY_PASSWORD={dockerhub_token}", - ] - if dockerhub_username and dockerhub_token - else [] - ), - "registry:2", - ] - ) + spawn.runv(["kind", "delete", "cluster", "--name", cluster]) - ghcr_username = "materialize-bot" - ghcr_token = os.getenv("GITHUB_GHCR_TOKEN") - spawn.runv( - [ - "docker", - "run", - "-d", - "--name", - "proxy-ghcr", - "--restart=always", - "--net=kind", - "-v", - f"{MZ_ROOT}/misc/kind/cache/ghcr:/var/lib/registry", - "-e", - "REGISTRY_PROXY_REMOTEURL=https://ghcr.io", - *( - [ - "-e", - f"REGISTRY_PROXY_USERNAME={ghcr_username}", - "-e", - f"REGISTRY_PROXY_PASSWORD={ghcr_token}", - ] - if ghcr_username and ghcr_token - else [] - ), - "registry:2", - ] - ) + try: + spawn.runv(["docker", "network", "create", "kind"]) + except: + pass + try: + spawn.runv(["docker", "stop", "proxy-dockerhub"]) + except: + pass + try: + spawn.runv(["docker", "rm", "proxy-dockerhub"]) + except: + pass + try: + spawn.runv(["docker", "stop", "proxy-ghcr"]) + except: + pass + try: + spawn.runv(["docker", "rm", "proxy-ghcr"]) + except: + pass + + dockerhub_username = os.getenv("DOCKERHUB_USERNAME") + dockerhub_token = os.getenv("DOCKERHUB_ACCESS_TOKEN") + spawn.runv( + [ + "docker", + "run", + "-d", + "--name", + "proxy-dockerhub", + "--restart=always", + "--net=kind", + "-v", + f"{MZ_ROOT}/misc/kind/cache/dockerhub:/var/lib/registry", + "-e", + "REGISTRY_PROXY_REMOTEURL=https://registry-1.docker.io", + *( + [ + "-e", + f"REGISTRY_PROXY_USERNAME={dockerhub_username}", + "-e", + f"REGISTRY_PROXY_PASSWORD={dockerhub_token}", + ] + if dockerhub_username and dockerhub_token + else [] + ), + "registry:2", + ] + ) - with ( - open(MZ_ROOT / "test" / "orchestratord" / "cluster.yaml.tmpl") as in_file, - open(MZ_ROOT / "test" / "orchestratord" / "cluster.yaml", "w") as out_file, - ): - text = in_file.read() - out_file.write( - text.replace( - "$DOCKER_CONFIG", - os.getenv("DOCKER_CONFIG", f'{os.environ["HOME"]}/.docker'), + ghcr_username = "materialize-bot" + ghcr_token = os.getenv("GITHUB_GHCR_TOKEN") + spawn.runv( + [ + "docker", + "run", + "-d", + "--name", + "proxy-ghcr", + "--restart=always", + "--net=kind", + "-v", + f"{MZ_ROOT}/misc/kind/cache/ghcr:/var/lib/registry", + "-e", + "REGISTRY_PROXY_REMOTEURL=https://ghcr.io", + *( + [ + "-e", + f"REGISTRY_PROXY_USERNAME={ghcr_username}", + "-e", + f"REGISTRY_PROXY_PASSWORD={ghcr_token}", + ] + if ghcr_username and ghcr_token + else [] + ), + "registry:2", + ] + ) + + with ( + open(MZ_ROOT / "test" / "orchestratord" / "cluster.yaml.tmpl") as in_file, + open(MZ_ROOT / "test" / "orchestratord" / "cluster.yaml", "w") as out_file, + ): + text = in_file.read() + out_file.write( + text.replace( + "$DOCKER_CONFIG", + os.getenv("DOCKER_CONFIG", f'{os.environ["HOME"]}/.docker'), + ) ) + + spawn.runv( + [ + "kind", + "create", + "cluster", + "--name", + cluster, + "--config", + MZ_ROOT / "test" / "orchestratord" / "cluster.yaml", + ] + ) + spawn.runv( + [ + "helm", + "repo", + "add", + "metrics-server", + "https://kubernetes-sigs.github.io/metrics-server/", + ] + ) + spawn.runv(["helm", "repo", "update", "metrics-server"]) + spawn.runv( + [ + "helm", + "install", + "metrics-server", + "metrics-server/metrics-server", + "--namespace", + "kube-system", + "--set", + "args={--kubelet-insecure-tls,--kubelet-preferred-address-types=InternalIP,Hostname,ExternalIP}", + ] ) - spawn.runv( - [ - "kind", - "create", - "cluster", - "--name", - cluster, - "--config", - MZ_ROOT / "test" / "orchestratord" / "cluster.yaml", - ] - ) - spawn.runv( - [ - "helm", - "repo", - "add", - "metrics-server", - "https://kubernetes-sigs.github.io/metrics-server/", - ] - ) - spawn.runv(["helm", "repo", "update", "metrics-server"]) - spawn.runv( - [ - "helm", - "install", - "metrics-server", - "metrics-server/metrics-server", - "--namespace", - "kube-system", - "--set", - "args={--kubelet-insecure-tls,--kubelet-preferred-address-types=InternalIP,Hostname,ExternalIP}", - ] - ) + spawn.runv(["kubectl", "create", "namespace", "materialize"]) - spawn.runv(["kubectl", "create", "namespace", "materialize"]) + spawn.runv( + [ + "kubectl", + "apply", + "-f", + MZ_ROOT / "misc" / "helm-charts" / "testing" / "postgres.yaml", + ] + ) + spawn.runv( + [ + "kubectl", + "apply", + "-f", + MZ_ROOT / "misc" / "helm-charts" / "testing" / "minio.yaml", + ] + ) + spawn.runv( + [ + "kubectl", + "apply", + "-f", + MZ_ROOT / "test" / "orchestratord" / "storageclass.yaml", + ] + ) - spawn.runv( - [ - "kubectl", - "apply", - "-f", - MZ_ROOT / "misc" / "helm-charts" / "testing" / "postgres.yaml", - ] - ) - spawn.runv( - [ - "kubectl", - "apply", - "-f", - MZ_ROOT / "misc" / "helm-charts" / "testing" / "minio.yaml", - ] - ) - spawn.runv( - [ - "kubectl", - "apply", - "-f", - MZ_ROOT / "test" / "orchestratord" / "storageclass.yaml", + if not args.tag: + services = [ + "orchestratord", + "environmentd", + "clusterd", + "balancerd", ] + c.up(*[Service(service, idle=True) for service in services]) + for service in services: + spawn.runv( + [ + "docker", + "tag", + c.compose["services"][service]["image"], + get_image(c.compose["services"][service]["image"], None), + ] + ) + spawn.runv( + ["kind", "load", "docker-image", "--name", cluster] + + [ + get_image(c.compose["services"][service]["image"], None) + for service in services + ] + ) + + definition: dict[str, Any] = {} + + with open(MZ_ROOT / "misc" / "helm-charts" / "operator" / "values.yaml") as f: + definition["operator"] = yaml.load(f, Loader=yaml.Loader) + with open(MZ_ROOT / "misc" / "helm-charts" / "testing" / "materialize.yaml") as f: + materialize_setup = list(yaml.load_all(f, Loader=yaml.Loader)) + assert len(materialize_setup) == 3 + definition["namespace"] = materialize_setup[0] + definition["secret"] = materialize_setup[1] + definition["materialize"] = materialize_setup[2] + + get_version(args.tag) + if args.orchestratord_override: + definition["operator"]["operator"]["image"]["tag"] = get_tag(args.tag) + # TODO: database-issues#9696, makes environmentd -> clusterd connections fail + # definition["operator"]["networkPolicies"]["enabled"] = True + # definition["operator"]["networkPolicies"]["internal"]["enabled"] = True + # definition["operator"]["networkPolicies"]["egress"]["enabled"] = True + # definition["operator"]["networkPolicies"]["ingress"]["enabled"] = True + # TODO: Remove when fixed: error: unexpected argument '--disable-license-key-checks' found + definition["operator"]["operator"]["args"]["enableLicenseKeyChecks"] = True + definition["operator"]["clusterd"]["nodeSelector"][ + "workload" + ] = "materialize-instance" + definition["operator"]["environmentd"]["nodeSelector"][ + "workload" + ] = "materialize-instance" + definition["secret"]["stringData"]["license_key"] = os.environ["MZ_CI_LICENSE_KEY"] + definition["materialize"]["spec"]["environmentdImageRef"] = get_image( + c.compose["services"]["environmentd"]["image"], args.tag ) + # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json + # more than one address + return definition DONE_SCENARIOS = set()