Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,30 @@ steps:
agents:
queue: hetzner-aarch64-8cpu-16gb

- id: pg-cdc-old-syntax-multi-version-upgrade-random
label: Postgres CDC tests (before source versioning, multi-version upgrade, random upgrade path)
depends_on: build-aarch64
timeout_in_minutes: 60
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration-multi-version-upgrade
args: [--mode=random, --seed=$BUILDKITE_JOB_ID]
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-old-syntax-multi-version-upgrade-earliest-to-current
label: Postgres CDC tests (before source versioning, multi-version upgrade, earliest to current direct upgrade)
depends_on: build-aarch64
timeout_in_minutes: 60
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration-multi-version-upgrade
args: [--mode=earliest-to-current]
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-rtr-old-syntax
label: Postgres RTR tests (before source versioning)
depends_on: build-aarch64
Expand Down
23 changes: 7 additions & 16 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
WaitReadyMz,
)
from materialize.checks.scenarios import Scenario
from materialize.mz_0dt_upgrader import generate_random_upgrade_path
from materialize.mz_version import MzVersion
from materialize.mzcompose import get_default_system_parameters
from materialize.mzcompose.services.materialized import LEADER_STATUS_HEALTHCHECK
from materialize.version_list import (
get_compatible_upgrade_from_versions,
get_published_minor_mz_versions,
get_self_managed_versions,
get_supported_self_managed_versions,
)

# late initialization
Expand Down Expand Up @@ -587,7 +588,7 @@ def __init__(
features: Features,
seed: str | None = None,
):
self.self_managed_versions = get_supported_self_managed_versions()
self.self_managed_versions = get_compatible_upgrade_from_versions()
super().__init__(checks, executor, features, seed)

def base_version(self) -> MzVersion:
Expand Down Expand Up @@ -643,7 +644,7 @@ def __init__(
features: Features,
seed: str | None = None,
):
self.self_managed_versions = get_supported_self_managed_versions()
self.self_managed_versions = get_compatible_upgrade_from_versions()
super().__init__(checks, executor, features, seed)

def base_version(self) -> MzVersion:
Expand Down Expand Up @@ -709,7 +710,7 @@ def __init__(
features: Features,
seed: str | None = None,
):
self.self_managed_versions = get_supported_self_managed_versions()
self.self_managed_versions = get_compatible_upgrade_from_versions()
super().__init__(checks, executor, features, seed)

def _generate_random_upgrade_path(
Expand All @@ -721,17 +722,7 @@ def _generate_random_upgrade_path(
if self.rng is None or len(versions) == 0:
return versions

selected_versions = []
# For each version in the input list, randomly select it with a 50% chance.
for v in versions:
if self.rng.random() < 0.5:
selected_versions.append(v)

# Always include at least one version to avoid empty paths.
if len(selected_versions) == 0:
selected_versions.append(self.rng.choice(versions))

return selected_versions
return generate_random_upgrade_path(versions, self.rng)

def base_version(self) -> MzVersion:
return self.self_managed_versions[0]
Expand Down Expand Up @@ -838,7 +829,7 @@ def __init__(
features: Features,
seed: str | None = None,
):
self.self_managed_versions = get_supported_self_managed_versions()
self.self_managed_versions = get_compatible_upgrade_from_versions()
super().__init__(checks, executor, features, seed)

def base_version(self) -> MzVersion:
Expand Down
197 changes: 197 additions & 0 deletions misc/python/materialize/mz_0dt_upgrader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from collections.abc import Callable
from dataclasses import dataclass
from random import Random
from typing import TypedDict

from materialize.mz_version import MzVersion
from materialize.mzcompose import get_default_system_parameters
from materialize.mzcompose.composition import Composition
from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
from materialize.mzcompose.services.postgres import METADATA_STORE


class MaterializedUpgradeArgs(TypedDict):
"""Arguments for the Materialized service constructor required for 0dt upgrades."""

name: str
image: str | None
deploy_generation: int
system_parameter_defaults: dict[str, str]
external_metadata_store: bool
restart: str


@dataclass
class UpgradeStep:
"""Represents a single upgrade step with its service name and action."""

new_service: Materialized
previous_service: Materialized
upgrade: Callable[[], None]


def generate_materialized_upgrade_args(
versions: list[MzVersion | None],
) -> list[MaterializedUpgradeArgs]:
"""
Constructs a list of required Materialized arguments for 0dt upgrades.
Requires there to be an mz_1 and mz_2 service already in the composition.
"""
# We use the first version to get the system parameters since the defaults for
# newer versions include cutting edge features than can break backwards compatibility.
# TODO (multiversion1): Get minimal system parameters by default to avoid cutting edge features.
system_parameter_defaults = get_default_system_parameters(versions[0])

return [
MaterializedUpgradeArgs(
image=f"materialize/materialized:{version}" if version else None,
# Cycle through mz_1 and mz_2 for upgrades since spinning up services have a cost.
name=f"mz_{(i % 2) + 1}",
# Generation number for the service. Required to start services in read only mode.
deploy_generation=i,
system_parameter_defaults=system_parameter_defaults,
# To share the same metadata store between services
external_metadata_store=True,
# To restart when container exits due to promotion
restart="on-failure",
)
for i, version in enumerate(versions)
]


def generate_random_upgrade_path(
versions: list[MzVersion],
rng: Random | None = None,
) -> list[MzVersion]:
"""
Generates a random upgrade path between the given versions.
"""
selected_versions = []

rng = rng or Random()
# For each version in the input list, randomly select it with a 50% chance.
for v in versions:
if rng.random() < 0.5:
selected_versions.append(v)

# Always include at least one version to avoid empty paths.
if len(selected_versions) == 0:
selected_versions.append(rng.choice(versions))

return selected_versions


class Materialized0dtUpgrader:
"""
Manages a sequence of Materialized service upgrades using zero-downtime deployments.

Args:
materialized_services: List of Materialized instances representing each upgrade step
"""

def __init__(self, c: Composition, materialized_services: list[Materialized]):
self.materialized_services = materialized_services
self.c = c

def create_upgrade_steps_list(self) -> list[UpgradeStep]:
"""
Returns a list of upgrade step actions from the second service onward.

Each step is a closure that, when called, will perform
the upgrade step to the corresponding service.
"""

def create_upgrade_action(
current_service: Materialized,
previous_service: Materialized,
):
def upgrade() -> None:
with self.c.override(current_service):
current_service_image = (
current_service.config.get("image") or "current"
)
previous_service_image = previous_service.config.get("image")

print(f"Bringing up {current_service_image}")
self.c.up(current_service.name)
print(f"Awaiting promotion of {current_service_image}")
self.c.await_mz_deployment_status(
DeploymentStatus.READY_TO_PROMOTE, current_service.name
)
self.c.promote_mz(current_service.name)
print(f"Awaiting leader status of {current_service_image}")
self.c.await_mz_deployment_status(
DeploymentStatus.IS_LEADER, current_service.name
)

print(f"Killing {previous_service_image}")
self.c.kill(previous_service.name, wait=True)

return upgrade

services = self.materialized_services
steps = []
for idx in range(1, len(services)):
current_service = services[idx]
previous_service = services[idx - 1]

steps.append(
UpgradeStep(
new_service=current_service,
previous_service=previous_service,
upgrade=create_upgrade_action(current_service, previous_service),
)
)
return steps

def initialize(self) -> tuple[Materialized, list[UpgradeStep]]:
"""
Initialize the with the first service. Returns a list where
each step is a closure that, when called, will perform the upgrade step to the corresponding service.
"""
first_service = self.materialized_services[0]
with self.c.override(first_service):
print(f"Bringing up {first_service.name}")
self.c.up(first_service.name)

return first_service, self.create_upgrade_steps_list()

def print_upgrade_path(self) -> None:
"""
Print the upgrade steps.
"""

def image_to_string(image: str | None) -> str:
return "current" if image is None else image.split(":")[-1]

print(
f"Upgrade path: {str.join(' -> ', [image_to_string(service.config.get('image')) for service in self.materialized_services])}"
)

def cleanup(self) -> None:
"""
Cleanup after upgrade.
"""
print("Cleaning up upgrade path")
# Ensure all services are killed and removed
self.c.kill(
*[service.name for service in self.materialized_services], wait=True
)
self.c.rm(
*[service.name for service in self.materialized_services],
destroy_volumes=True,
)
self.c.kill(METADATA_STORE, wait=True)
self.c.rm(
METADATA_STORE,
destroy_volumes=True,
)
20 changes: 14 additions & 6 deletions misc/python/materialize/source_table_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,37 @@


def verify_sources_after_source_table_migration(
c: Composition, file: str, fail: bool = False
c: Composition,
file: str,
fail: bool = False,
service: str | None = None,
) -> None:
source_names_rows = c.sql_query(
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';",
service=service,
)
source_names = [row[0] for row in source_names_rows]

print(f"Sources created in {file} are: {source_names}")

c.sql("SET statement_timeout = '20s'")
c.sql("SET statement_timeout = '20s'", service=service)

for source_name in source_names:
_verify_source(c, file, source_name, fail=fail)
_verify_source(c, file, source_name, fail=fail, service=service)


def _verify_source(
c: Composition, file: str, source_name: str, fail: bool = False
c: Composition,
file: str,
source_name: str,
fail: bool = False,
service: str | None = None,
) -> None:
try:
print(f"Checking source: {source_name}")

statement = f"SHOW CREATE SOURCE {source_name};"
result = c.sql_query(statement)
result = c.sql_query(statement, service=service)
sql = result[0][1]
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"
Expand Down
Loading