From 9b8ee8da655841fab750558ce4630133dd249c54 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 18 Nov 2025 17:51:40 -0500 Subject: [PATCH 1/3] Reduce duplication for test setup --- test/pg-cdc-old-syntax/mzcompose.py | 115 +++++++++++++++------------- 1 file changed, 61 insertions(+), 54 deletions(-) diff --git a/test/pg-cdc-old-syntax/mzcompose.py b/test/pg-cdc-old-syntax/mzcompose.py index 1d0e655e0b096..4c56a74fdf2f8 100644 --- a/test/pg-cdc-old-syntax/mzcompose.py +++ b/test/pg-cdc-old-syntax/mzcompose.py @@ -96,6 +96,40 @@ def create_postgres( return Postgres(image=image, extra_command=extra_command) +def get_testdrive_ssl_args(c: Composition): + """Extract SSL certificates from test-certs service and return testdrive arguments related to SSL.""" + c.up(Service("test-certs", idle=True)) + ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout + ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout + ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout + ssl_wrong_cert = c.run( + "test-certs", "cat", "/secrets/postgres.crt", capture=True + ).stdout + ssl_wrong_key = c.run( + "test-certs", "cat", "/secrets/postgres.key", capture=True + ).stdout + + testdrive_args = [ + f"--var=ssl-ca={ssl_ca}", + f"--var=ssl-cert={ssl_cert}", + f"--var=ssl-key={ssl_key}", + f"--var=ssl-wrong-cert={ssl_wrong_cert}", + f"--var=ssl-wrong-key={ssl_wrong_key}", + ] + + return { + "testdrive_args": testdrive_args, + "volumes_extra": ["secrets:/share/secrets"], + } + + +def get_default_testdrive_size_args(): + return [ + f"--var=default-replica-size=scale={Materialized.Size.DEFAULT_SIZE},workers={Materialized.Size.DEFAULT_SIZE}", + f"--var=default-storage-size=scale={Materialized.Size.DEFAULT_SIZE},workers=1", + ] + + SERVICES = [ Mz(app_password=""), Materialized( @@ -307,39 +341,20 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None: ) args = parser.parse_args() - matching_files = [] - for filter in args.filter: - matching_files.extend( - glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") - ) - sharded_files: list[str] = buildkite.shard_list( - sorted(matching_files), lambda file: file - ) + sharded_files = get_sharded_files(args.filter) print(f"Files: {sharded_files}") + ssl_args_dict = get_testdrive_ssl_args(c) + testdrive_ssl_args = ssl_args_dict["testdrive_args"] - c.up(Service("test-certs", idle=True)) - ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout - ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout - ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout - ssl_wrong_cert = c.run( - "test-certs", "cat", "/secrets/postgres.crt", capture=True - ).stdout - ssl_wrong_key = c.run( - "test-certs", "cat", "/secrets/postgres.key", capture=True - ).stdout - + testdrive_args = ( + testdrive_ssl_args + get_default_testdrive_size_args() + ["--no-reset"] + ) with c.override(create_postgres(pg_version=pg_version)): c.up("materialized", "test-certs", "postgres") c.test_parts( sharded_files, lambda file: c.run_testdrive_files( - f"--var=ssl-ca={ssl_ca}", - f"--var=ssl-cert={ssl_cert}", - f"--var=ssl-key={ssl_key}", - f"--var=ssl-wrong-cert={ssl_wrong_cert}", - f"--var=ssl-wrong-key={ssl_wrong_key}", - f"--var=default-replica-size=scale={Materialized.Size.DEFAULT_SIZE},workers={Materialized.Size.DEFAULT_SIZE}", - f"--var=default-storage-size=scale={Materialized.Size.DEFAULT_SIZE},workers=1", + *testdrive_args, file, ), ) @@ -377,6 +392,15 @@ def process(name: str) -> None: c.test_parts(sharded_workflows, process) +def get_sharded_files(filters: str) -> list[str]: + matching_files = [] + for filter in filters: + matching_files.extend( + glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") + ) + return buildkite.shard_list(sorted(matching_files), lambda file: file) + + def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "filter", @@ -386,33 +410,23 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: ) args = parser.parse_args() - matching_files = [] - for filter in args.filter: - matching_files.extend( - glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") - ) - sharded_files: list[str] = buildkite.shard_list( - sorted(matching_files), lambda file: file - ) + sharded_files = get_sharded_files(args.filter) print(f"Files: {sharded_files}") - c.up(Service("test-certs", idle=True)) - ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout - ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout - ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout - ssl_wrong_cert = c.run( - "test-certs", "cat", "/secrets/postgres.crt", capture=True - ).stdout - ssl_wrong_key = c.run( - "test-certs", "cat", "/secrets/postgres.key", capture=True - ).stdout + ssl_args_dict = get_testdrive_ssl_args(c) + testdrive_ssl_args = ssl_args_dict["testdrive_args"] + volumes_extra = ssl_args_dict["volumes_extra"] + + testdrive_args = ( + testdrive_ssl_args + get_default_testdrive_size_args() + ["--no-reset"] + ) pg_version = get_targeted_pg_version(parser) for file in sharded_files: mz_old = Materialized( name="materialized", - volumes_extra=["secrets:/share/secrets"], + volumes_extra=volumes_extra, external_metadata_store=True, external_blob_store=True, additional_system_parameter_defaults={ @@ -423,7 +437,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: mz_new = Materialized( name="materialized", - volumes_extra=["secrets:/share/secrets"], + volumes_extra=volumes_extra, external_metadata_store=True, external_blob_store=True, additional_system_parameter_defaults={ @@ -438,14 +452,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: print(f"Running {file} with mz_old") c.run_testdrive_files( - f"--var=ssl-ca={ssl_ca}", - f"--var=ssl-cert={ssl_cert}", - f"--var=ssl-key={ssl_key}", - f"--var=ssl-wrong-cert={ssl_wrong_cert}", - f"--var=ssl-wrong-key={ssl_wrong_key}", - f"--var=default-replica-size=scale={Materialized.Size.DEFAULT_SIZE},workers={Materialized.Size.DEFAULT_SIZE}", - f"--var=default-storage-size=scale={Materialized.Size.DEFAULT_SIZE},workers=1", - "--no-reset", + *testdrive_args, file, ) c.kill("materialized", wait=True) From 0a340ad889907aaf3c7ddb77d6e23f8f995a8431 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Wed, 19 Nov 2025 19:21:28 -0500 Subject: [PATCH 2/3] Port pg-cdc-old-syntax tests to use multiversion upgrades - Adds helper class for doing 0dt upgrades - Add a multiversion test for pg-cdc-old-syntax - Branch on certain testdrive tests due to incompatible syntax - Change version list filtering for upgrade tests Filter on versions within the previous unskippable major release. This won't work for 26.0.0 since we can't upgrade from 25.1, so we keep the same conditional - Add multi-version upgrade tests for Postgres old CDC to release qualification pipeline --- .../pipeline.template.yml | 24 +++ .../materialize/checks/scenarios_upgrade.py | 23 +- misc/python/materialize/mz_0dt_upgrader.py | 197 ++++++++++++++++++ .../materialize/source_table_migration.py | 20 +- misc/python/materialize/version_list.py | 57 +++-- test/pg-cdc-old-syntax/exclude-columns.td | 4 + test/pg-cdc-old-syntax/mzcompose.py | 133 +++++++++++- test/pg-cdc-old-syntax/privileges.td | 24 ++- test/pg-cdc-old-syntax/statistics.td | 4 + 9 files changed, 446 insertions(+), 40 deletions(-) create mode 100644 misc/python/materialize/mz_0dt_upgrader.py diff --git a/ci/release-qualification/pipeline.template.yml b/ci/release-qualification/pipeline.template.yml index 1ce7f231d5d91..6970eff62c145 100644 --- a/ci/release-qualification/pipeline.template.yml +++ b/ci/release-qualification/pipeline.template.yml @@ -318,6 +318,30 @@ steps: agents: queue: hetzner-aarch64-4cpu-8gb + - id: pg-cdc-old-syntax-multi-version-upgrade-random + label: Postgres CDC tests (before source versioning, multi-version upgrade, random upgrade path) + depends_on: build-aarch64 + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration-multi-version-upgrade + args: [--mode=random, --seed=$BUILDKITE_JOB_ID] + agents: + queue: hetzner-aarch64-4cpu-8gb + + - id: pg-cdc-old-syntax-multi-version-upgrade-earliest-to-current + label: Postgres CDC tests (before source versioning, multi-version upgrade, earliest to current direct upgrade) + depends_on: build-aarch64 + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration-multi-version-upgrade + args: [--mode=earliest-to-current] + agents: + queue: hetzner-aarch64-4cpu-8gb + - group: "Platform checks" key: platform-checks steps: diff --git a/misc/python/materialize/checks/scenarios_upgrade.py b/misc/python/materialize/checks/scenarios_upgrade.py index dd31a8ee288b7..700c2b76240f5 100644 --- a/misc/python/materialize/checks/scenarios_upgrade.py +++ b/misc/python/materialize/checks/scenarios_upgrade.py @@ -24,13 +24,14 @@ WaitReadyMz, ) from materialize.checks.scenarios import Scenario +from materialize.mz_0dt_upgrader import generate_random_upgrade_path from materialize.mz_version import MzVersion from materialize.mzcompose import get_default_system_parameters from materialize.mzcompose.services.materialized import LEADER_STATUS_HEALTHCHECK from materialize.version_list import ( + get_compatible_upgrade_from_versions, get_published_minor_mz_versions, get_self_managed_versions, - get_supported_self_managed_versions, ) # late initialization @@ -587,7 +588,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def base_version(self) -> MzVersion: @@ -643,7 +644,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def base_version(self) -> MzVersion: @@ -709,7 +710,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def _generate_random_upgrade_path( @@ -721,17 +722,7 @@ def _generate_random_upgrade_path( if self.rng is None or len(versions) == 0: return versions - selected_versions = [] - # For each version in the input list, randomly select it with a 50% chance. - for v in versions: - if self.rng.random() < 0.5: - selected_versions.append(v) - - # Always include at least one version to avoid empty paths. - if len(selected_versions) == 0: - selected_versions.append(self.rng.choice(versions)) - - return selected_versions + return generate_random_upgrade_path(versions, self.rng) def base_version(self) -> MzVersion: return self.self_managed_versions[0] @@ -838,7 +829,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def base_version(self) -> MzVersion: diff --git a/misc/python/materialize/mz_0dt_upgrader.py b/misc/python/materialize/mz_0dt_upgrader.py new file mode 100644 index 0000000000000..a06643b324d9e --- /dev/null +++ b/misc/python/materialize/mz_0dt_upgrader.py @@ -0,0 +1,197 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from collections.abc import Callable +from dataclasses import dataclass +from random import Random +from typing import TypedDict + +from materialize.mz_version import MzVersion +from materialize.mzcompose import get_default_system_parameters +from materialize.mzcompose.composition import Composition +from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized +from materialize.mzcompose.services.postgres import METADATA_STORE + + +class MaterializedUpgradeArgs(TypedDict): + """Arguments for the Materialized service constructor required for 0dt upgrades.""" + + name: str + image: str | None + deploy_generation: int + system_parameter_defaults: dict[str, str] + external_metadata_store: bool + restart: str + + +@dataclass +class UpgradeStep: + """Represents a single upgrade step with its service name and action.""" + + new_service: Materialized + previous_service: Materialized + upgrade: Callable[[], None] + + +def generate_materialized_upgrade_args( + versions: list[MzVersion | None], +) -> list[MaterializedUpgradeArgs]: + """ + Constructs a list of required Materialized arguments for 0dt upgrades. + Requires there to be an mz_1 and mz_2 service already in the composition. + """ + # We use the first version to get the system parameters since the defaults for + # newer versions include cutting edge features than can break backwards compatibility. + # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features. + system_parameter_defaults = get_default_system_parameters(versions[0]) + + return [ + MaterializedUpgradeArgs( + image=f"materialize/materialized:{version}" if version else None, + # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost. + name=f"mz_{(i % 2) + 1}", + # Generation number for the service. Required to start services in read only mode. + deploy_generation=i, + system_parameter_defaults=system_parameter_defaults, + # To share the same metadata store between services + external_metadata_store=True, + # To restart when container exits due to promotion + restart="on-failure", + ) + for i, version in enumerate(versions) + ] + + +def generate_random_upgrade_path( + versions: list[MzVersion], + rng: Random | None = None, +) -> list[MzVersion]: + """ + Generates a random upgrade path between the given versions. + """ + selected_versions = [] + + rng = rng or Random() + # For each version in the input list, randomly select it with a 50% chance. + for v in versions: + if rng.random() < 0.5: + selected_versions.append(v) + + # Always include at least one version to avoid empty paths. + if len(selected_versions) == 0: + selected_versions.append(rng.choice(versions)) + + return selected_versions + + +class Materialized0dtUpgrader: + """ + Manages a sequence of Materialized service upgrades using zero-downtime deployments. + + Args: + materialized_services: List of Materialized instances representing each upgrade step + """ + + def __init__(self, c: Composition, materialized_services: list[Materialized]): + self.materialized_services = materialized_services + self.c = c + + def create_upgrade_steps_list(self) -> list[UpgradeStep]: + """ + Returns a list of upgrade step actions from the second service onward. + + Each step is a closure that, when called, will perform + the upgrade step to the corresponding service. + """ + + def create_upgrade_action( + current_service: Materialized, + previous_service: Materialized, + ): + def upgrade() -> None: + with self.c.override(current_service): + current_service_image = ( + current_service.config.get("image") or "current" + ) + previous_service_image = previous_service.config.get("image") + + print(f"Bringing up {current_service_image}") + self.c.up(current_service.name) + print(f"Awaiting promotion of {current_service_image}") + self.c.await_mz_deployment_status( + DeploymentStatus.READY_TO_PROMOTE, current_service.name + ) + self.c.promote_mz(current_service.name) + print(f"Awaiting leader status of {current_service_image}") + self.c.await_mz_deployment_status( + DeploymentStatus.IS_LEADER, current_service.name + ) + + print(f"Killing {previous_service_image}") + self.c.kill(previous_service.name, wait=True) + + return upgrade + + services = self.materialized_services + steps = [] + for idx in range(1, len(services)): + current_service = services[idx] + previous_service = services[idx - 1] + + steps.append( + UpgradeStep( + new_service=current_service, + previous_service=previous_service, + upgrade=create_upgrade_action(current_service, previous_service), + ) + ) + return steps + + def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: + """ + Initialize the with the first service. Returns a list where + each step is a closure that, when called, will perform the upgrade step to the corresponding service. + """ + first_service = self.materialized_services[0] + with self.c.override(first_service): + print(f"Bringing up {first_service.name}") + self.c.up(first_service.name) + + return first_service, self.create_upgrade_steps_list() + + def print_upgrade_path(self) -> None: + """ + Print the upgrade steps. + """ + + def image_to_string(image: str | None) -> str: + return "current" if image is None else image.split(":")[-1] + + print( + f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" + ) + + def cleanup(self) -> None: + """ + Cleanup after upgrade. + """ + print("Cleaning up upgrade path") + # Ensure all services are killed and removed + self.c.kill( + *[service.name for service in self.materialized_services], wait=True + ) + self.c.rm( + *[service.name for service in self.materialized_services], + destroy_volumes=True, + ) + self.c.kill(METADATA_STORE, wait=True) + self.c.rm( + METADATA_STORE, + destroy_volumes=True, + ) diff --git a/misc/python/materialize/source_table_migration.py b/misc/python/materialize/source_table_migration.py index 264c1761f3d2d..6a68e15e5f0d4 100644 --- a/misc/python/materialize/source_table_migration.py +++ b/misc/python/materialize/source_table_migration.py @@ -12,29 +12,37 @@ def verify_sources_after_source_table_migration( - c: Composition, file: str, fail: bool = False + c: Composition, + file: str, + fail: bool = False, + service: str | None = None, ) -> None: source_names_rows = c.sql_query( - "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';" + "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';", + service=service, ) source_names = [row[0] for row in source_names_rows] print(f"Sources created in {file} are: {source_names}") - c.sql("SET statement_timeout = '20s'") + c.sql("SET statement_timeout = '20s'", service=service) for source_name in source_names: - _verify_source(c, file, source_name, fail=fail) + _verify_source(c, file, source_name, fail=fail, service=service) def _verify_source( - c: Composition, file: str, source_name: str, fail: bool = False + c: Composition, + file: str, + source_name: str, + fail: bool = False, + service: str | None = None, ) -> None: try: print(f"Checking source: {source_name}") statement = f"SHOW CREATE SOURCE {source_name};" - result = c.sql_query(statement) + result = c.sql_query(statement, service=service) sql = result[0][1] assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}" assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}" diff --git a/misc/python/materialize/version_list.py b/misc/python/materialize/version_list.py index d9ce031b04f6d..4d0ecf3fbed19 100644 --- a/misc/python/materialize/version_list.py +++ b/misc/python/materialize/version_list.py @@ -74,18 +74,34 @@ def get_self_managed_versions() -> list[MzVersion]: return sorted(result) -# Gets the supported self managed versions relative to the current version -def get_supported_self_managed_versions() -> list[MzVersion]: - self_managed_versions = fetch_self_managed_versions() - # TODO (multiversion2): Change this to filter on versions between the next and previous unskippable major release - # when unskippable versions are implemented. - return sorted( - { +# Gets the range of versions we can "upgrade from" to the current version, sorted in ascending order. +def get_compatible_upgrade_from_versions() -> list[MzVersion]: + + # Determine the current MzVersion from the environment, or from a version constant + current_version = MzVersion.parse_cargo() + + published_versions_within_one_major_version = { + v + for v in get_published_mz_versions_within_one_major_version() + if abs(v.major - current_version.major) <= 1 and v <= current_version + } + + if current_version.major <= 26: + # For versions <= 26, we can only upgrade from 25.2 self-managed versions + self_managed_25_2_versions = { v.version - for v in self_managed_versions + for v in fetch_self_managed_versions() if v.helm_version.major == 25 and v.helm_version.minor == 2 } - ) + + return sorted( + self_managed_25_2_versions.union( + published_versions_within_one_major_version + ) + ) + else: + # For versions > 26, get all mz versions within 1 major version of current_version + return sorted(published_versions_within_one_major_version) BAD_SELF_MANAGED_VERSIONS = { @@ -439,6 +455,8 @@ def get_all_mz_versions( version_type=MzVersion, newest_first=newest_first ) if version not in INVALID_VERSIONS + # Exclude release candidates + and not version.prerelease ] @@ -457,9 +475,24 @@ def get_all_published_mz_versions( newest_first: bool = True, limit: int | None = None ) -> list[MzVersion]: """Get all mz versions based on git tags. This method ensures that images of the versions exist.""" - return limit_to_published_versions( - get_all_mz_versions(newest_first=newest_first), limit - ) + all_versions = get_all_mz_versions(newest_first=newest_first) + print(f"all_versions: {all_versions}") + return limit_to_published_versions(all_versions, limit) + + +def get_published_mz_versions_within_one_major_version( + newest_first: bool = True, +) -> list[MzVersion]: + """Get all previous mz versions within one major version of the current version. Ensure that images of the versions exist.""" + current_version = MzVersion.parse_cargo() + all_versions = get_all_mz_versions(newest_first=newest_first) + versions_within_one_major_version = { + v + for v in all_versions + if abs(v.major - current_version.major) <= 1 and v <= current_version + } + + return limit_to_published_versions(list(versions_within_one_major_version)) def limit_to_published_versions( diff --git a/test/pg-cdc-old-syntax/exclude-columns.td b/test/pg-cdc-old-syntax/exclude-columns.td index 8326389246257..0bcba04c23c6c 100644 --- a/test/pg-cdc-old-syntax/exclude-columns.td +++ b/test/pg-cdc-old-syntax/exclude-columns.td @@ -11,6 +11,10 @@ # Test postgres EXCLUDE COLUMNS support # + +$ skip-if +SELECT mz_version_num() < 16200; + > CREATE SECRET pgpass AS 'postgres' > CREATE CONNECTION pg TO POSTGRES ( HOST postgres, diff --git a/test/pg-cdc-old-syntax/mzcompose.py b/test/pg-cdc-old-syntax/mzcompose.py index 4c56a74fdf2f8..05e237ff61852 100644 --- a/test/pg-cdc-old-syntax/mzcompose.py +++ b/test/pg-cdc-old-syntax/mzcompose.py @@ -13,11 +13,16 @@ import glob import time +from random import Random import pg8000 from pg8000 import Connection from materialize import MZ_ROOT, buildkite +from materialize.mz_0dt_upgrader import ( + Materialized0dtUpgrader, + generate_materialized_upgrade_args, +) from materialize.mzcompose.composition import ( Composition, Service, @@ -39,6 +44,7 @@ from materialize.source_table_migration import ( verify_sources_after_source_table_migration, ) +from materialize.version_list import get_compatible_upgrade_from_versions # Set the max slot WAL keep size to 10MB DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"] @@ -140,6 +146,12 @@ def get_default_testdrive_size_args(): external_blob_store=True, default_replication_factor=2, ), + Materialized( + name="mz_1", + ), + Materialized( + name="mz_2", + ), Testdrive(), CockroachOrPostgresMetadata(), Minio(setup_materialize=True), @@ -341,7 +353,9 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None: ) args = parser.parse_args() - sharded_files = get_sharded_files(args.filter) + sharded_files = [ + file for file in get_sharded_files(args.filter) if file != "exclude-columns.td" + ] print(f"Files: {sharded_files}") ssl_args_dict = get_testdrive_ssl_args(c) testdrive_ssl_args = ssl_args_dict["testdrive_args"] @@ -362,7 +376,7 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None: def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: def process(name: str) -> None: - if name in ("default", "migration"): + if name in ("default", "migration", "migration-multi-version-upgrade"): return # TODO: Flaky, reenable when database-issues#8447 is fixed @@ -382,7 +396,8 @@ def process(name: str) -> None: [ w for w in c.workflows - if w not in workflows_with_internal_sharding and w != "migration" + if w not in workflows_with_internal_sharding + and w not in ("migration", "migration-multi-version-upgrade") ], lambda w: w, ) @@ -398,6 +413,7 @@ def get_sharded_files(filters: str) -> list[str]: matching_files.extend( glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") ) + return buildkite.shard_list(sorted(matching_files), lambda file: file) @@ -470,3 +486,114 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: c.rm(METADATA_STORE) c.rm("postgres") c.rm_volumes("mzdata") + + +def workflow_migration_multi_version_upgrade( + c: Composition, parser: WorkflowArgumentParser +) -> None: + """ + Multiversion upgrade with the source versioning migration. + """ + pg_version = get_targeted_pg_version(parser) + + parser.add_argument( + "--mode", + type=str, + choices=["random", "earliest-to-current"], + default="earliest-to-current", + help="Upgrade mode: 'random' for random version to upgrade from, 'earliest-to-current' for upgrading from the earliest upgradeable version to the current version.", + ) + + parser.add_argument( + "--seed", + type=str, + default=None, + help="Random seed to use for upgrade path selection", + ) + + parser.add_argument( + "filter", + nargs="*", + default=["*.td"], + help="limit to only the files matching filter", + ) + + args = parser.parse_args() + + # Get matching files and apply sharding + + sharded_files = get_sharded_files(args.filter) + print(f"Files: {sharded_files}") + + ssl_args_dict = get_testdrive_ssl_args(c) + testdrive_ssl_args = ssl_args_dict["testdrive_args"] + volumes_extra = ssl_args_dict["volumes_extra"] + + testdrive_args = ( + testdrive_ssl_args + get_default_testdrive_size_args() + ["--no-reset"] + ) + + compatible_versions = get_compatible_upgrade_from_versions() + + if args.mode == "random": + random_initial_version = Random(args.seed).choice(compatible_versions) + versions = [random_initial_version, None] + else: + versions = [compatible_versions[0], None] + + materialize_service_instances = [] + + upgrade_args_list = generate_materialized_upgrade_args(versions) + + for i, upgrade_args in enumerate(upgrade_args_list): + log_filter = "mz_storage::source::postgres=trace,debug,info,warn,error" + + # Enable source versioning migration at the end (final version) + enable_source_migration_arg = ( + {"force_source_table_syntax": "true"} + if i == len(upgrade_args_list) - 1 + else {} + ) + + materialize_service_instances.append( + Materialized( + **upgrade_args, + external_blob_store=True, + volumes_extra=volumes_extra, + additional_system_parameter_defaults={ + log_filter: log_filter, + **enable_source_migration_arg, + }, + ) + ) + + upgrade_path = Materialized0dtUpgrader(c, materialize_service_instances) + + upgrade_path.print_upgrade_path() + + for file in sharded_files: + + with c.override(create_postgres(pg_version=pg_version)): + c.up("test-certs", "postgres") + initial_materialized_service, upgrade_steps = upgrade_path.initialize() + c.run_testdrive_files( + *testdrive_args, + file, + mz_service=initial_materialized_service.name, + ) + + for step in upgrade_steps: + step.upgrade() + + # Verify the source table migration at the end. + last_materialized_service = upgrade_steps[-1].new_service + print( + f"Verifying source table migration for version {last_materialized_service.config.get('image')}" + ) + verify_sources_after_source_table_migration( + c, file, service=last_materialized_service.name + ) + upgrade_path.cleanup() + c.kill("postgres", wait=True) + c.rm("postgres") + c.rm_volumes("mzdata") diff --git a/test/pg-cdc-old-syntax/privileges.td b/test/pg-cdc-old-syntax/privileges.td index aa6a90e81968c..da2cf7d49dd50 100644 --- a/test/pg-cdc-old-syntax/privileges.td +++ b/test/pg-cdc-old-syntax/privileges.td @@ -57,12 +57,18 @@ REVOKE USAGE ON SCHEMA other FROM priv; PASSWORD SECRET pgpass ) -! CREATE SOURCE mz_source +![version>15800] CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS(public, other); contains:insufficient privileges detail:user lacks USAGE privileges for schemas other +![version<=15800] CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + FOR SCHEMAS(public, other); +contains:insufficient privileges +detail:user priv lacks USAGE privileges for schemas other + # # SELECT errors # @@ -71,23 +77,35 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres GRANT ALL PRIVILEGES ON SCHEMA other TO priv; REVOKE SELECT ON TABLE other.s FROM priv; -! CREATE SOURCE mz_source +![version>15800] CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS(public, other); contains:insufficient privileges detail:user lacks SELECT privileges for tables other.s +![version<=15800] CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + FOR SCHEMAS(public, other); +contains:insufficient privileges +detail:user priv lacks SELECT privileges for tables other.s + $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE TABLE "select" (a INT); REVOKE SELECT ON public.select FROM priv; CREATE TABLE """select""" (a INT); REVOKE SELECT ON public."""select""" FROM priv; -! CREATE SOURCE mz_source +![version>15800] CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS(public); contains:insufficient privileges detail:user lacks SELECT privileges for tables public."""select""", public."select" +![version<=15800] CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + FOR SCHEMAS(public); +contains:insufficient privileges +detail:user priv lacks SELECT privileges for tables public."""select""", public."select" + $ postgres-execute connection=postgres://postgres:postgres@postgres DROP SCHEMA IF EXISTS other CASCADE; diff --git a/test/pg-cdc-old-syntax/statistics.td b/test/pg-cdc-old-syntax/statistics.td index 96a8d9cf8c3ba..4ce8bb70d5194 100644 --- a/test/pg-cdc-old-syntax/statistics.td +++ b/test/pg-cdc-old-syntax/statistics.td @@ -13,6 +13,10 @@ $ set-sql-timeout duration=60s # Test progress statistics # +$ skip-if +SELECT mz_version_num() < 15100; + + $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SYSTEM SET storage_statistics_collection_interval = 1000 ALTER SYSTEM SET storage_statistics_interval = 2000 From c5a0b6d0f76f1a0142285d30573ee12caa849ec3 Mon Sep 17 00:00:00 2001 From: Sang Jun Bak Date: Tue, 25 Nov 2025 15:11:44 -0500 Subject: [PATCH 3/3] Move tests to nightly --- ci/nightly/pipeline.template.yml | 24 +++++++++++++++++++ .../pipeline.template.yml | 24 ------------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 62f4d397bb71e..45217cfe1f58d 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -759,6 +759,30 @@ steps: agents: queue: hetzner-aarch64-8cpu-16gb + - id: pg-cdc-old-syntax-multi-version-upgrade-random + label: Postgres CDC tests (before source versioning, multi-version upgrade, random upgrade path) + depends_on: build-aarch64 + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration-multi-version-upgrade + args: [--mode=random, --seed=$BUILDKITE_JOB_ID] + agents: + queue: hetzner-aarch64-4cpu-8gb + + - id: pg-cdc-old-syntax-multi-version-upgrade-earliest-to-current + label: Postgres CDC tests (before source versioning, multi-version upgrade, earliest to current direct upgrade) + depends_on: build-aarch64 + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration-multi-version-upgrade + args: [--mode=earliest-to-current] + agents: + queue: hetzner-aarch64-4cpu-8gb + - id: pg-rtr-old-syntax label: Postgres RTR tests (before source versioning) depends_on: build-aarch64 diff --git a/ci/release-qualification/pipeline.template.yml b/ci/release-qualification/pipeline.template.yml index 6970eff62c145..1ce7f231d5d91 100644 --- a/ci/release-qualification/pipeline.template.yml +++ b/ci/release-qualification/pipeline.template.yml @@ -318,30 +318,6 @@ steps: agents: queue: hetzner-aarch64-4cpu-8gb - - id: pg-cdc-old-syntax-multi-version-upgrade-random - label: Postgres CDC tests (before source versioning, multi-version upgrade, random upgrade path) - depends_on: build-aarch64 - timeout_in_minutes: 60 - plugins: - - ./ci/plugins/mzcompose: - composition: pg-cdc-old-syntax - run: migration-multi-version-upgrade - args: [--mode=random, --seed=$BUILDKITE_JOB_ID] - agents: - queue: hetzner-aarch64-4cpu-8gb - - - id: pg-cdc-old-syntax-multi-version-upgrade-earliest-to-current - label: Postgres CDC tests (before source versioning, multi-version upgrade, earliest to current direct upgrade) - depends_on: build-aarch64 - timeout_in_minutes: 60 - plugins: - - ./ci/plugins/mzcompose: - composition: pg-cdc-old-syntax - run: migration-multi-version-upgrade - args: [--mode=earliest-to-current] - agents: - queue: hetzner-aarch64-4cpu-8gb - - group: "Platform checks" key: platform-checks steps: