Skip to content

Commit 745d0b9

Browse files
committed
Port pg-cdc-old-syntax tests to use multiversion upgrades
- Adds helper class for doing 0dt upgrades - Add a multiversion test for pg-cdc-old-syntax - Branch on certain testdrive tests due to incompatible syntax - Change version list filtering for upgrade tests Filter on versions within the previous unskippable major release. This won't work for 26.0.0 since we can't upgrade from 25.1, so we keep the same conditional - Add multi-version upgrade tests for Postgres old CDC to release qualification pipeline
1 parent 9b8ee8d commit 745d0b9

File tree

9 files changed

+447
-40
lines changed

9 files changed

+447
-40
lines changed

ci/release-qualification/pipeline.template.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,30 @@ steps:
318318
agents:
319319
queue: hetzner-aarch64-4cpu-8gb
320320

321+
- id: pg-cdc-old-syntax-multi-version-upgrade-random
322+
label: Postgres CDC tests (before source versioning, multi-version upgrade, random upgrade path)
323+
depends_on: build-aarch64
324+
timeout_in_minutes: 120
325+
plugins:
326+
- ./ci/plugins/mzcompose:
327+
composition: pg-cdc-old-syntax
328+
run: migration-multi-version-upgrade
329+
args: [--mode=random, --seed=$BUILDKITE_JOB_ID]
330+
agents:
331+
queue: hetzner-aarch64-4cpu-8gb
332+
333+
- id: pg-cdc-old-syntax-multi-version-upgrade-earliest-to-current
334+
label: Postgres CDC tests (before source versioning, multi-version upgrade, earliest to current direct upgrade)
335+
depends_on: build-aarch64
336+
timeout_in_minutes: 60
337+
plugins:
338+
- ./ci/plugins/mzcompose:
339+
composition: pg-cdc-old-syntax
340+
run: migration-multi-version-upgrade
341+
args: [--mode=earliest-to-current]
342+
agents:
343+
queue: hetzner-aarch64-4cpu-8gb
344+
321345
- group: "Platform checks"
322346
key: platform-checks
323347
steps:

misc/python/materialize/checks/scenarios_upgrade.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@
2424
WaitReadyMz,
2525
)
2626
from materialize.checks.scenarios import Scenario
27+
from materialize.mz_0dt_upgrader import generate_random_upgrade_path
2728
from materialize.mz_version import MzVersion
2829
from materialize.mzcompose import get_default_system_parameters
2930
from materialize.mzcompose.services.materialized import LEADER_STATUS_HEALTHCHECK
3031
from materialize.version_list import (
32+
get_compatible_upgrade_from_versions,
3133
get_published_minor_mz_versions,
3234
get_self_managed_versions,
33-
get_supported_self_managed_versions,
3435
)
3536

3637
# late initialization
@@ -587,7 +588,7 @@ def __init__(
587588
features: Features,
588589
seed: str | None = None,
589590
):
590-
self.self_managed_versions = get_supported_self_managed_versions()
591+
self.self_managed_versions = get_compatible_upgrade_from_versions()
591592
super().__init__(checks, executor, features, seed)
592593

593594
def base_version(self) -> MzVersion:
@@ -643,7 +644,7 @@ def __init__(
643644
features: Features,
644645
seed: str | None = None,
645646
):
646-
self.self_managed_versions = get_supported_self_managed_versions()
647+
self.self_managed_versions = get_compatible_upgrade_from_versions()
647648
super().__init__(checks, executor, features, seed)
648649

649650
def base_version(self) -> MzVersion:
@@ -709,7 +710,7 @@ def __init__(
709710
features: Features,
710711
seed: str | None = None,
711712
):
712-
self.self_managed_versions = get_supported_self_managed_versions()
713+
self.self_managed_versions = get_compatible_upgrade_from_versions()
713714
super().__init__(checks, executor, features, seed)
714715

715716
def _generate_random_upgrade_path(
@@ -721,17 +722,7 @@ def _generate_random_upgrade_path(
721722
if self.rng is None or len(versions) == 0:
722723
return versions
723724

724-
selected_versions = []
725-
# For each version in the input list, randomly select it with a 50% chance.
726-
for v in versions:
727-
if self.rng.random() < 0.5:
728-
selected_versions.append(v)
729-
730-
# Always include at least one version to avoid empty paths.
731-
if len(selected_versions) == 0:
732-
selected_versions.append(self.rng.choice(versions))
733-
734-
return selected_versions
725+
return generate_random_upgrade_path(versions, self.rng)
735726

736727
def base_version(self) -> MzVersion:
737728
return self.self_managed_versions[0]
@@ -838,7 +829,7 @@ def __init__(
838829
features: Features,
839830
seed: str | None = None,
840831
):
841-
self.self_managed_versions = get_supported_self_managed_versions()
832+
self.self_managed_versions = get_compatible_upgrade_from_versions()
842833
super().__init__(checks, executor, features, seed)
843834

844835
def base_version(self) -> MzVersion:
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
from collections.abc import Callable
11+
from dataclasses import dataclass
12+
from random import Random
13+
from typing import TypedDict
14+
15+
from materialize.mz_version import MzVersion
16+
from materialize.mzcompose import get_default_system_parameters
17+
from materialize.mzcompose.composition import Composition
18+
from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
19+
from materialize.mzcompose.services.postgres import METADATA_STORE
20+
21+
22+
class MaterializedUpgradeArgs(TypedDict):
23+
"""Arguments for the Materialized service constructor required for 0dt upgrades."""
24+
25+
name: str
26+
image: str | None
27+
deploy_generation: int
28+
system_parameter_defaults: dict[str, str]
29+
external_metadata_store: bool
30+
restart: str
31+
32+
33+
@dataclass
34+
class UpgradeStep:
35+
"""Represents a single upgrade step with its service name and action."""
36+
37+
new_service: Materialized
38+
previous_service: Materialized
39+
upgrade: Callable[[], None]
40+
41+
42+
def generate_materialized_upgrade_args(
43+
versions: list[MzVersion | None],
44+
) -> list[MaterializedUpgradeArgs]:
45+
"""
46+
Constructs a list of required Materialized arguments for 0dt upgrades.
47+
Requires there to be an mz_1 and mz_2 service already in the composition.
48+
"""
49+
# We use the first version to get the system parameters since the defaults for
50+
# newer versions include cutting edge features than can break backwards compatibility.
51+
# TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features.
52+
system_parameter_defaults = get_default_system_parameters(versions[0])
53+
54+
return [
55+
MaterializedUpgradeArgs(
56+
image=f"materialize/materialized:{version}" if version else None,
57+
# Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost.
58+
name=f"mz_{(i % 2) + 1}",
59+
# Generation number for the service. Required to start services in read only mode.
60+
deploy_generation=i,
61+
system_parameter_defaults=system_parameter_defaults,
62+
# To share the same metadata store between services
63+
external_metadata_store=True,
64+
# To restart when container exits due to promotion
65+
restart="on-failure",
66+
)
67+
for i, version in enumerate(versions)
68+
]
69+
70+
71+
def generate_random_upgrade_path(
72+
versions: list[MzVersion],
73+
rng: Random | None = None,
74+
) -> list[MzVersion]:
75+
"""
76+
Generates a random upgrade path between the given versions.
77+
"""
78+
selected_versions = []
79+
80+
rng = rng or Random()
81+
# For each version in the input list, randomly select it with a 50% chance.
82+
for v in versions:
83+
if rng.random() < 0.5:
84+
selected_versions.append(v)
85+
86+
# Always include at least one version to avoid empty paths.
87+
if len(selected_versions) == 0:
88+
selected_versions.append(rng.choice(versions))
89+
90+
return selected_versions
91+
92+
93+
class Materialized0dtUpgrader:
94+
"""
95+
Manages a sequence of Materialized service upgrades using zero-downtime deployments.
96+
97+
Args:
98+
materialized_services: List of Materialized instances representing each upgrade step
99+
"""
100+
101+
def __init__(self, c: Composition, materialized_services: list[Materialized]):
102+
self.materialized_services = materialized_services
103+
self.c = c
104+
105+
def create_upgrade_steps_list(self) -> list[UpgradeStep]:
106+
"""
107+
Returns a list of upgrade step actions from the second service onward.
108+
109+
Each step is a closure that, when called, will perform
110+
the upgrade step to the corresponding service.
111+
"""
112+
113+
def create_upgrade_action(
114+
current_service: Materialized,
115+
previous_service: Materialized,
116+
):
117+
def upgrade() -> None:
118+
with self.c.override(current_service):
119+
current_service_image = (
120+
current_service.config.get("image") or "current"
121+
)
122+
previous_service_image = previous_service.config.get("image")
123+
124+
print(f"Bringing up {current_service_image}")
125+
self.c.up(current_service.name)
126+
print(f"Awaiting promotion of {current_service_image}")
127+
self.c.await_mz_deployment_status(
128+
DeploymentStatus.READY_TO_PROMOTE, current_service.name
129+
)
130+
self.c.promote_mz(current_service.name)
131+
print(f"Awaiting leader status of {current_service_image}")
132+
self.c.await_mz_deployment_status(
133+
DeploymentStatus.IS_LEADER, current_service.name
134+
)
135+
136+
print(f"Killing {previous_service_image}")
137+
self.c.kill(previous_service.name, wait=True)
138+
139+
return upgrade
140+
141+
services = self.materialized_services
142+
steps = []
143+
for idx in range(1, len(services)):
144+
current_service = services[idx]
145+
previous_service = services[idx - 1]
146+
147+
steps.append(
148+
UpgradeStep(
149+
new_service=current_service,
150+
previous_service=previous_service,
151+
upgrade=create_upgrade_action(current_service, previous_service),
152+
)
153+
)
154+
return steps
155+
156+
def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
157+
"""
158+
Initialize the with the first service. Returns a list where
159+
each step is a closure that, when called, will perform the upgrade step to the corresponding service.
160+
"""
161+
first_service = self.materialized_services[0]
162+
with self.c.override(first_service):
163+
print(f"Bringing up {first_service.name}")
164+
self.c.up(first_service.name)
165+
166+
return first_service, self.create_upgrade_steps_list()
167+
168+
def print_upgrade_path(self) -> None:
169+
"""
170+
Print the upgrade steps.
171+
"""
172+
173+
def image_to_string(image: str | None) -> str:
174+
return "current" if image is None else image.split(":")[-1]
175+
176+
print(
177+
f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
178+
)
179+
180+
def cleanup(self) -> None:
181+
"""
182+
Cleanup after upgrade.
183+
"""
184+
print("Cleaning up upgrade path")
185+
# Ensure all services are killed and removed
186+
self.c.kill(
187+
*[service.name for service in self.materialized_services], wait=True
188+
)
189+
self.c.rm(
190+
*[service.name for service in self.materialized_services],
191+
destroy_volumes=True,
192+
)
193+
self.c.kill(METADATA_STORE, wait=True)
194+
self.c.rm(
195+
METADATA_STORE,
196+
destroy_volumes=True,
197+
)

misc/python/materialize/source_table_migration.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,37 @@
1212

1313

1414
def verify_sources_after_source_table_migration(
15-
c: Composition, file: str, fail: bool = False
15+
c: Composition,
16+
file: str,
17+
fail: bool = False,
18+
service: str | None = None,
1619
) -> None:
1720
source_names_rows = c.sql_query(
18-
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
21+
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';",
22+
service=service,
1923
)
2024
source_names = [row[0] for row in source_names_rows]
2125

2226
print(f"Sources created in {file} are: {source_names}")
2327

24-
c.sql("SET statement_timeout = '20s'")
28+
c.sql("SET statement_timeout = '20s'", service=service)
2529

2630
for source_name in source_names:
27-
_verify_source(c, file, source_name, fail=fail)
31+
_verify_source(c, file, source_name, fail=fail, service=service)
2832

2933

3034
def _verify_source(
31-
c: Composition, file: str, source_name: str, fail: bool = False
35+
c: Composition,
36+
file: str,
37+
source_name: str,
38+
fail: bool = False,
39+
service: str | None = None,
3240
) -> None:
3341
try:
3442
print(f"Checking source: {source_name}")
3543

3644
statement = f"SHOW CREATE SOURCE {source_name};"
37-
result = c.sql_query(statement)
45+
result = c.sql_query(statement, service=service)
3846
sql = result[0][1]
3947
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
4048
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"

0 commit comments

Comments
 (0)