diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 62f4d397bb71e..45217cfe1f58d 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -759,6 +759,30 @@ steps: agents: queue: hetzner-aarch64-8cpu-16gb + - id: pg-cdc-old-syntax-multi-version-upgrade-random + label: Postgres CDC tests (before source versioning, multi-version upgrade, random upgrade path) + depends_on: build-aarch64 + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration-multi-version-upgrade + args: [--mode=random, --seed=$BUILDKITE_JOB_ID] + agents: + queue: hetzner-aarch64-4cpu-8gb + + - id: pg-cdc-old-syntax-multi-version-upgrade-earliest-to-current + label: Postgres CDC tests (before source versioning, multi-version upgrade, earliest to current direct upgrade) + depends_on: build-aarch64 + timeout_in_minutes: 60 + plugins: + - ./ci/plugins/mzcompose: + composition: pg-cdc-old-syntax + run: migration-multi-version-upgrade + args: [--mode=earliest-to-current] + agents: + queue: hetzner-aarch64-4cpu-8gb + - id: pg-rtr-old-syntax label: Postgres RTR tests (before source versioning) depends_on: build-aarch64 diff --git a/misc/python/materialize/checks/scenarios_upgrade.py b/misc/python/materialize/checks/scenarios_upgrade.py index dd31a8ee288b7..700c2b76240f5 100644 --- a/misc/python/materialize/checks/scenarios_upgrade.py +++ b/misc/python/materialize/checks/scenarios_upgrade.py @@ -24,13 +24,14 @@ WaitReadyMz, ) from materialize.checks.scenarios import Scenario +from materialize.mz_0dt_upgrader import generate_random_upgrade_path from materialize.mz_version import MzVersion from materialize.mzcompose import get_default_system_parameters from materialize.mzcompose.services.materialized import LEADER_STATUS_HEALTHCHECK from materialize.version_list import ( + get_compatible_upgrade_from_versions, get_published_minor_mz_versions, get_self_managed_versions, - get_supported_self_managed_versions, ) # late initialization @@ -587,7 +588,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def base_version(self) -> MzVersion: @@ -643,7 +644,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def base_version(self) -> MzVersion: @@ -709,7 +710,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def _generate_random_upgrade_path( @@ -721,17 +722,7 @@ def _generate_random_upgrade_path( if self.rng is None or len(versions) == 0: return versions - selected_versions = [] - # For each version in the input list, randomly select it with a 50% chance. - for v in versions: - if self.rng.random() < 0.5: - selected_versions.append(v) - - # Always include at least one version to avoid empty paths. - if len(selected_versions) == 0: - selected_versions.append(self.rng.choice(versions)) - - return selected_versions + return generate_random_upgrade_path(versions, self.rng) def base_version(self) -> MzVersion: return self.self_managed_versions[0] @@ -838,7 +829,7 @@ def __init__( features: Features, seed: str | None = None, ): - self.self_managed_versions = get_supported_self_managed_versions() + self.self_managed_versions = get_compatible_upgrade_from_versions() super().__init__(checks, executor, features, seed) def base_version(self) -> MzVersion: diff --git a/misc/python/materialize/mz_0dt_upgrader.py b/misc/python/materialize/mz_0dt_upgrader.py new file mode 100644 index 0000000000000..a06643b324d9e --- /dev/null +++ b/misc/python/materialize/mz_0dt_upgrader.py @@ -0,0 +1,197 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from collections.abc import Callable +from dataclasses import dataclass +from random import Random +from typing import TypedDict + +from materialize.mz_version import MzVersion +from materialize.mzcompose import get_default_system_parameters +from materialize.mzcompose.composition import Composition +from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized +from materialize.mzcompose.services.postgres import METADATA_STORE + + +class MaterializedUpgradeArgs(TypedDict): + """Arguments for the Materialized service constructor required for 0dt upgrades.""" + + name: str + image: str | None + deploy_generation: int + system_parameter_defaults: dict[str, str] + external_metadata_store: bool + restart: str + + +@dataclass +class UpgradeStep: + """Represents a single upgrade step with its service name and action.""" + + new_service: Materialized + previous_service: Materialized + upgrade: Callable[[], None] + + +def generate_materialized_upgrade_args( + versions: list[MzVersion | None], +) -> list[MaterializedUpgradeArgs]: + """ + Constructs a list of required Materialized arguments for 0dt upgrades. + Requires there to be an mz_1 and mz_2 service already in the composition. + """ + # We use the first version to get the system parameters since the defaults for + # newer versions include cutting edge features than can break backwards compatibility. + # TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features. + system_parameter_defaults = get_default_system_parameters(versions[0]) + + return [ + MaterializedUpgradeArgs( + image=f"materialize/materialized:{version}" if version else None, + # Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost. + name=f"mz_{(i % 2) + 1}", + # Generation number for the service. Required to start services in read only mode. + deploy_generation=i, + system_parameter_defaults=system_parameter_defaults, + # To share the same metadata store between services + external_metadata_store=True, + # To restart when container exits due to promotion + restart="on-failure", + ) + for i, version in enumerate(versions) + ] + + +def generate_random_upgrade_path( + versions: list[MzVersion], + rng: Random | None = None, +) -> list[MzVersion]: + """ + Generates a random upgrade path between the given versions. + """ + selected_versions = [] + + rng = rng or Random() + # For each version in the input list, randomly select it with a 50% chance. + for v in versions: + if rng.random() < 0.5: + selected_versions.append(v) + + # Always include at least one version to avoid empty paths. + if len(selected_versions) == 0: + selected_versions.append(rng.choice(versions)) + + return selected_versions + + +class Materialized0dtUpgrader: + """ + Manages a sequence of Materialized service upgrades using zero-downtime deployments. + + Args: + materialized_services: List of Materialized instances representing each upgrade step + """ + + def __init__(self, c: Composition, materialized_services: list[Materialized]): + self.materialized_services = materialized_services + self.c = c + + def create_upgrade_steps_list(self) -> list[UpgradeStep]: + """ + Returns a list of upgrade step actions from the second service onward. + + Each step is a closure that, when called, will perform + the upgrade step to the corresponding service. + """ + + def create_upgrade_action( + current_service: Materialized, + previous_service: Materialized, + ): + def upgrade() -> None: + with self.c.override(current_service): + current_service_image = ( + current_service.config.get("image") or "current" + ) + previous_service_image = previous_service.config.get("image") + + print(f"Bringing up {current_service_image}") + self.c.up(current_service.name) + print(f"Awaiting promotion of {current_service_image}") + self.c.await_mz_deployment_status( + DeploymentStatus.READY_TO_PROMOTE, current_service.name + ) + self.c.promote_mz(current_service.name) + print(f"Awaiting leader status of {current_service_image}") + self.c.await_mz_deployment_status( + DeploymentStatus.IS_LEADER, current_service.name + ) + + print(f"Killing {previous_service_image}") + self.c.kill(previous_service.name, wait=True) + + return upgrade + + services = self.materialized_services + steps = [] + for idx in range(1, len(services)): + current_service = services[idx] + previous_service = services[idx - 1] + + steps.append( + UpgradeStep( + new_service=current_service, + previous_service=previous_service, + upgrade=create_upgrade_action(current_service, previous_service), + ) + ) + return steps + + def initialize(self) -> tuple[Materialized, list[UpgradeStep]]: + """ + Initialize the with the first service. Returns a list where + each step is a closure that, when called, will perform the upgrade step to the corresponding service. + """ + first_service = self.materialized_services[0] + with self.c.override(first_service): + print(f"Bringing up {first_service.name}") + self.c.up(first_service.name) + + return first_service, self.create_upgrade_steps_list() + + def print_upgrade_path(self) -> None: + """ + Print the upgrade steps. + """ + + def image_to_string(image: str | None) -> str: + return "current" if image is None else image.split(":")[-1] + + print( + f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}" + ) + + def cleanup(self) -> None: + """ + Cleanup after upgrade. + """ + print("Cleaning up upgrade path") + # Ensure all services are killed and removed + self.c.kill( + *[service.name for service in self.materialized_services], wait=True + ) + self.c.rm( + *[service.name for service in self.materialized_services], + destroy_volumes=True, + ) + self.c.kill(METADATA_STORE, wait=True) + self.c.rm( + METADATA_STORE, + destroy_volumes=True, + ) diff --git a/misc/python/materialize/source_table_migration.py b/misc/python/materialize/source_table_migration.py index 264c1761f3d2d..6a68e15e5f0d4 100644 --- a/misc/python/materialize/source_table_migration.py +++ b/misc/python/materialize/source_table_migration.py @@ -12,29 +12,37 @@ def verify_sources_after_source_table_migration( - c: Composition, file: str, fail: bool = False + c: Composition, + file: str, + fail: bool = False, + service: str | None = None, ) -> None: source_names_rows = c.sql_query( - "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';" + "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';", + service=service, ) source_names = [row[0] for row in source_names_rows] print(f"Sources created in {file} are: {source_names}") - c.sql("SET statement_timeout = '20s'") + c.sql("SET statement_timeout = '20s'", service=service) for source_name in source_names: - _verify_source(c, file, source_name, fail=fail) + _verify_source(c, file, source_name, fail=fail, service=service) def _verify_source( - c: Composition, file: str, source_name: str, fail: bool = False + c: Composition, + file: str, + source_name: str, + fail: bool = False, + service: str | None = None, ) -> None: try: print(f"Checking source: {source_name}") statement = f"SHOW CREATE SOURCE {source_name};" - result = c.sql_query(statement) + result = c.sql_query(statement, service=service) sql = result[0][1] assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}" assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}" diff --git a/misc/python/materialize/version_list.py b/misc/python/materialize/version_list.py index d9ce031b04f6d..4d0ecf3fbed19 100644 --- a/misc/python/materialize/version_list.py +++ b/misc/python/materialize/version_list.py @@ -74,18 +74,34 @@ def get_self_managed_versions() -> list[MzVersion]: return sorted(result) -# Gets the supported self managed versions relative to the current version -def get_supported_self_managed_versions() -> list[MzVersion]: - self_managed_versions = fetch_self_managed_versions() - # TODO (multiversion2): Change this to filter on versions between the next and previous unskippable major release - # when unskippable versions are implemented. - return sorted( - { +# Gets the range of versions we can "upgrade from" to the current version, sorted in ascending order. +def get_compatible_upgrade_from_versions() -> list[MzVersion]: + + # Determine the current MzVersion from the environment, or from a version constant + current_version = MzVersion.parse_cargo() + + published_versions_within_one_major_version = { + v + for v in get_published_mz_versions_within_one_major_version() + if abs(v.major - current_version.major) <= 1 and v <= current_version + } + + if current_version.major <= 26: + # For versions <= 26, we can only upgrade from 25.2 self-managed versions + self_managed_25_2_versions = { v.version - for v in self_managed_versions + for v in fetch_self_managed_versions() if v.helm_version.major == 25 and v.helm_version.minor == 2 } - ) + + return sorted( + self_managed_25_2_versions.union( + published_versions_within_one_major_version + ) + ) + else: + # For versions > 26, get all mz versions within 1 major version of current_version + return sorted(published_versions_within_one_major_version) BAD_SELF_MANAGED_VERSIONS = { @@ -439,6 +455,8 @@ def get_all_mz_versions( version_type=MzVersion, newest_first=newest_first ) if version not in INVALID_VERSIONS + # Exclude release candidates + and not version.prerelease ] @@ -457,9 +475,24 @@ def get_all_published_mz_versions( newest_first: bool = True, limit: int | None = None ) -> list[MzVersion]: """Get all mz versions based on git tags. This method ensures that images of the versions exist.""" - return limit_to_published_versions( - get_all_mz_versions(newest_first=newest_first), limit - ) + all_versions = get_all_mz_versions(newest_first=newest_first) + print(f"all_versions: {all_versions}") + return limit_to_published_versions(all_versions, limit) + + +def get_published_mz_versions_within_one_major_version( + newest_first: bool = True, +) -> list[MzVersion]: + """Get all previous mz versions within one major version of the current version. Ensure that images of the versions exist.""" + current_version = MzVersion.parse_cargo() + all_versions = get_all_mz_versions(newest_first=newest_first) + versions_within_one_major_version = { + v + for v in all_versions + if abs(v.major - current_version.major) <= 1 and v <= current_version + } + + return limit_to_published_versions(list(versions_within_one_major_version)) def limit_to_published_versions( diff --git a/test/pg-cdc-old-syntax/exclude-columns.td b/test/pg-cdc-old-syntax/exclude-columns.td index 8326389246257..0bcba04c23c6c 100644 --- a/test/pg-cdc-old-syntax/exclude-columns.td +++ b/test/pg-cdc-old-syntax/exclude-columns.td @@ -11,6 +11,10 @@ # Test postgres EXCLUDE COLUMNS support # + +$ skip-if +SELECT mz_version_num() < 16200; + > CREATE SECRET pgpass AS 'postgres' > CREATE CONNECTION pg TO POSTGRES ( HOST postgres, diff --git a/test/pg-cdc-old-syntax/mzcompose.py b/test/pg-cdc-old-syntax/mzcompose.py index 1d0e655e0b096..05e237ff61852 100644 --- a/test/pg-cdc-old-syntax/mzcompose.py +++ b/test/pg-cdc-old-syntax/mzcompose.py @@ -13,11 +13,16 @@ import glob import time +from random import Random import pg8000 from pg8000 import Connection from materialize import MZ_ROOT, buildkite +from materialize.mz_0dt_upgrader import ( + Materialized0dtUpgrader, + generate_materialized_upgrade_args, +) from materialize.mzcompose.composition import ( Composition, Service, @@ -39,6 +44,7 @@ from materialize.source_table_migration import ( verify_sources_after_source_table_migration, ) +from materialize.version_list import get_compatible_upgrade_from_versions # Set the max slot WAL keep size to 10MB DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"] @@ -96,6 +102,40 @@ def create_postgres( return Postgres(image=image, extra_command=extra_command) +def get_testdrive_ssl_args(c: Composition): + """Extract SSL certificates from test-certs service and return testdrive arguments related to SSL.""" + c.up(Service("test-certs", idle=True)) + ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout + ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout + ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout + ssl_wrong_cert = c.run( + "test-certs", "cat", "/secrets/postgres.crt", capture=True + ).stdout + ssl_wrong_key = c.run( + "test-certs", "cat", "/secrets/postgres.key", capture=True + ).stdout + + testdrive_args = [ + f"--var=ssl-ca={ssl_ca}", + f"--var=ssl-cert={ssl_cert}", + f"--var=ssl-key={ssl_key}", + f"--var=ssl-wrong-cert={ssl_wrong_cert}", + f"--var=ssl-wrong-key={ssl_wrong_key}", + ] + + return { + "testdrive_args": testdrive_args, + "volumes_extra": ["secrets:/share/secrets"], + } + + +def get_default_testdrive_size_args(): + return [ + f"--var=default-replica-size=scale={Materialized.Size.DEFAULT_SIZE},workers={Materialized.Size.DEFAULT_SIZE}", + f"--var=default-storage-size=scale={Materialized.Size.DEFAULT_SIZE},workers=1", + ] + + SERVICES = [ Mz(app_password=""), Materialized( @@ -106,6 +146,12 @@ def create_postgres( external_blob_store=True, default_replication_factor=2, ), + Materialized( + name="mz_1", + ), + Materialized( + name="mz_2", + ), Testdrive(), CockroachOrPostgresMetadata(), Minio(setup_materialize=True), @@ -307,39 +353,22 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None: ) args = parser.parse_args() - matching_files = [] - for filter in args.filter: - matching_files.extend( - glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") - ) - sharded_files: list[str] = buildkite.shard_list( - sorted(matching_files), lambda file: file - ) + sharded_files = [ + file for file in get_sharded_files(args.filter) if file != "exclude-columns.td" + ] print(f"Files: {sharded_files}") + ssl_args_dict = get_testdrive_ssl_args(c) + testdrive_ssl_args = ssl_args_dict["testdrive_args"] - c.up(Service("test-certs", idle=True)) - ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout - ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout - ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout - ssl_wrong_cert = c.run( - "test-certs", "cat", "/secrets/postgres.crt", capture=True - ).stdout - ssl_wrong_key = c.run( - "test-certs", "cat", "/secrets/postgres.key", capture=True - ).stdout - + testdrive_args = ( + testdrive_ssl_args + get_default_testdrive_size_args() + ["--no-reset"] + ) with c.override(create_postgres(pg_version=pg_version)): c.up("materialized", "test-certs", "postgres") c.test_parts( sharded_files, lambda file: c.run_testdrive_files( - f"--var=ssl-ca={ssl_ca}", - f"--var=ssl-cert={ssl_cert}", - f"--var=ssl-key={ssl_key}", - f"--var=ssl-wrong-cert={ssl_wrong_cert}", - f"--var=ssl-wrong-key={ssl_wrong_key}", - f"--var=default-replica-size=scale={Materialized.Size.DEFAULT_SIZE},workers={Materialized.Size.DEFAULT_SIZE}", - f"--var=default-storage-size=scale={Materialized.Size.DEFAULT_SIZE},workers=1", + *testdrive_args, file, ), ) @@ -347,7 +376,7 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None: def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: def process(name: str) -> None: - if name in ("default", "migration"): + if name in ("default", "migration", "migration-multi-version-upgrade"): return # TODO: Flaky, reenable when database-issues#8447 is fixed @@ -367,7 +396,8 @@ def process(name: str) -> None: [ w for w in c.workflows - if w not in workflows_with_internal_sharding and w != "migration" + if w not in workflows_with_internal_sharding + and w not in ("migration", "migration-multi-version-upgrade") ], lambda w: w, ) @@ -377,6 +407,16 @@ def process(name: str) -> None: c.test_parts(sharded_workflows, process) +def get_sharded_files(filters: str) -> list[str]: + matching_files = [] + for filter in filters: + matching_files.extend( + glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") + ) + + return buildkite.shard_list(sorted(matching_files), lambda file: file) + + def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: parser.add_argument( "filter", @@ -386,33 +426,23 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: ) args = parser.parse_args() - matching_files = [] - for filter in args.filter: - matching_files.extend( - glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax") - ) - sharded_files: list[str] = buildkite.shard_list( - sorted(matching_files), lambda file: file - ) + sharded_files = get_sharded_files(args.filter) print(f"Files: {sharded_files}") - c.up(Service("test-certs", idle=True)) - ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout - ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout - ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout - ssl_wrong_cert = c.run( - "test-certs", "cat", "/secrets/postgres.crt", capture=True - ).stdout - ssl_wrong_key = c.run( - "test-certs", "cat", "/secrets/postgres.key", capture=True - ).stdout + ssl_args_dict = get_testdrive_ssl_args(c) + testdrive_ssl_args = ssl_args_dict["testdrive_args"] + volumes_extra = ssl_args_dict["volumes_extra"] + + testdrive_args = ( + testdrive_ssl_args + get_default_testdrive_size_args() + ["--no-reset"] + ) pg_version = get_targeted_pg_version(parser) for file in sharded_files: mz_old = Materialized( name="materialized", - volumes_extra=["secrets:/share/secrets"], + volumes_extra=volumes_extra, external_metadata_store=True, external_blob_store=True, additional_system_parameter_defaults={ @@ -423,7 +453,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: mz_new = Materialized( name="materialized", - volumes_extra=["secrets:/share/secrets"], + volumes_extra=volumes_extra, external_metadata_store=True, external_blob_store=True, additional_system_parameter_defaults={ @@ -438,14 +468,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: print(f"Running {file} with mz_old") c.run_testdrive_files( - f"--var=ssl-ca={ssl_ca}", - f"--var=ssl-cert={ssl_cert}", - f"--var=ssl-key={ssl_key}", - f"--var=ssl-wrong-cert={ssl_wrong_cert}", - f"--var=ssl-wrong-key={ssl_wrong_key}", - f"--var=default-replica-size=scale={Materialized.Size.DEFAULT_SIZE},workers={Materialized.Size.DEFAULT_SIZE}", - f"--var=default-storage-size=scale={Materialized.Size.DEFAULT_SIZE},workers=1", - "--no-reset", + *testdrive_args, file, ) c.kill("materialized", wait=True) @@ -463,3 +486,114 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: c.rm(METADATA_STORE) c.rm("postgres") c.rm_volumes("mzdata") + + +def workflow_migration_multi_version_upgrade( + c: Composition, parser: WorkflowArgumentParser +) -> None: + """ + Multiversion upgrade with the source versioning migration. + """ + pg_version = get_targeted_pg_version(parser) + + parser.add_argument( + "--mode", + type=str, + choices=["random", "earliest-to-current"], + default="earliest-to-current", + help="Upgrade mode: 'random' for random version to upgrade from, 'earliest-to-current' for upgrading from the earliest upgradeable version to the current version.", + ) + + parser.add_argument( + "--seed", + type=str, + default=None, + help="Random seed to use for upgrade path selection", + ) + + parser.add_argument( + "filter", + nargs="*", + default=["*.td"], + help="limit to only the files matching filter", + ) + + args = parser.parse_args() + + # Get matching files and apply sharding + + sharded_files = get_sharded_files(args.filter) + print(f"Files: {sharded_files}") + + ssl_args_dict = get_testdrive_ssl_args(c) + testdrive_ssl_args = ssl_args_dict["testdrive_args"] + volumes_extra = ssl_args_dict["volumes_extra"] + + testdrive_args = ( + testdrive_ssl_args + get_default_testdrive_size_args() + ["--no-reset"] + ) + + compatible_versions = get_compatible_upgrade_from_versions() + + if args.mode == "random": + random_initial_version = Random(args.seed).choice(compatible_versions) + versions = [random_initial_version, None] + else: + versions = [compatible_versions[0], None] + + materialize_service_instances = [] + + upgrade_args_list = generate_materialized_upgrade_args(versions) + + for i, upgrade_args in enumerate(upgrade_args_list): + log_filter = "mz_storage::source::postgres=trace,debug,info,warn,error" + + # Enable source versioning migration at the end (final version) + enable_source_migration_arg = ( + {"force_source_table_syntax": "true"} + if i == len(upgrade_args_list) - 1 + else {} + ) + + materialize_service_instances.append( + Materialized( + **upgrade_args, + external_blob_store=True, + volumes_extra=volumes_extra, + additional_system_parameter_defaults={ + log_filter: log_filter, + **enable_source_migration_arg, + }, + ) + ) + + upgrade_path = Materialized0dtUpgrader(c, materialize_service_instances) + + upgrade_path.print_upgrade_path() + + for file in sharded_files: + + with c.override(create_postgres(pg_version=pg_version)): + c.up("test-certs", "postgres") + initial_materialized_service, upgrade_steps = upgrade_path.initialize() + c.run_testdrive_files( + *testdrive_args, + file, + mz_service=initial_materialized_service.name, + ) + + for step in upgrade_steps: + step.upgrade() + + # Verify the source table migration at the end. + last_materialized_service = upgrade_steps[-1].new_service + print( + f"Verifying source table migration for version {last_materialized_service.config.get('image')}" + ) + verify_sources_after_source_table_migration( + c, file, service=last_materialized_service.name + ) + upgrade_path.cleanup() + c.kill("postgres", wait=True) + c.rm("postgres") + c.rm_volumes("mzdata") diff --git a/test/pg-cdc-old-syntax/privileges.td b/test/pg-cdc-old-syntax/privileges.td index aa6a90e81968c..da2cf7d49dd50 100644 --- a/test/pg-cdc-old-syntax/privileges.td +++ b/test/pg-cdc-old-syntax/privileges.td @@ -57,12 +57,18 @@ REVOKE USAGE ON SCHEMA other FROM priv; PASSWORD SECRET pgpass ) -! CREATE SOURCE mz_source +![version>15800] CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS(public, other); contains:insufficient privileges detail:user lacks USAGE privileges for schemas other +![version<=15800] CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + FOR SCHEMAS(public, other); +contains:insufficient privileges +detail:user priv lacks USAGE privileges for schemas other + # # SELECT errors # @@ -71,23 +77,35 @@ $ postgres-execute connection=postgres://postgres:postgres@postgres GRANT ALL PRIVILEGES ON SCHEMA other TO priv; REVOKE SELECT ON TABLE other.s FROM priv; -! CREATE SOURCE mz_source +![version>15800] CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS(public, other); contains:insufficient privileges detail:user lacks SELECT privileges for tables other.s +![version<=15800] CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + FOR SCHEMAS(public, other); +contains:insufficient privileges +detail:user priv lacks SELECT privileges for tables other.s + $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE TABLE "select" (a INT); REVOKE SELECT ON public.select FROM priv; CREATE TABLE """select""" (a INT); REVOKE SELECT ON public."""select""" FROM priv; -! CREATE SOURCE mz_source +![version>15800] CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS(public); contains:insufficient privileges detail:user lacks SELECT privileges for tables public."""select""", public."select" +![version<=15800] CREATE SOURCE mz_source + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') + FOR SCHEMAS(public); +contains:insufficient privileges +detail:user priv lacks SELECT privileges for tables public."""select""", public."select" + $ postgres-execute connection=postgres://postgres:postgres@postgres DROP SCHEMA IF EXISTS other CASCADE; diff --git a/test/pg-cdc-old-syntax/statistics.td b/test/pg-cdc-old-syntax/statistics.td index 96a8d9cf8c3ba..4ce8bb70d5194 100644 --- a/test/pg-cdc-old-syntax/statistics.td +++ b/test/pg-cdc-old-syntax/statistics.td @@ -13,6 +13,10 @@ $ set-sql-timeout duration=60s # Test progress statistics # +$ skip-if +SELECT mz_version_num() < 15100; + + $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SYSTEM SET storage_statistics_collection_interval = 1000 ALTER SYSTEM SET storage_statistics_interval = 2000