From 3f6bf315000c19db57b535ea8474e1e7f9ab4b04 Mon Sep 17 00:00:00 2001 From: Alex Hunt Date: Wed, 19 Nov 2025 14:36:44 +0000 Subject: [PATCH] Manual promotion support --- src/cloud-resources/src/crd/materialize.rs | 31 +++++ .../src/controller/materialize.rs | 97 ++++++++++----- test/orchestratord/mzcompose.py | 112 +++++++++++++++++- 3 files changed, 208 insertions(+), 32 deletions(-) diff --git a/src/cloud-resources/src/crd/materialize.rs b/src/cloud-resources/src/crd/materialize.rs index 1a9ed3da75e6b..06c6d1c48a834 100644 --- a/src/cloud-resources/src/crd/materialize.rs +++ b/src/cloud-resources/src/crd/materialize.rs @@ -39,6 +39,23 @@ pub mod v1alpha1 { #[default] WaitUntilReady, + /// Create a new generation of pods, leaving the old generation as the serving generation + /// until the user manually promotes the new generation. + /// + /// Users can promote the new generation at any time, even if the new generation pods are + /// not fully caught up, by setting `forcePromote` to the same value as `requestRollout` in + /// the Materialize spec. + /// + /// {{}} + /// Do not leave new generations unpromoted indefinitely. + /// + /// The new generation keeps open read holds which prevent compaction. Once promoted or + /// cancelled, those read holds are released. If left unpromoted for an extended time, this + /// data can build up, and can cause extreme deletion load on the metadata backend database + /// when finally promoted or cancelled. + /// {{}} + ManuallyPromote, + /// {{}} /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!! /// @@ -386,6 +403,20 @@ pub mod v1alpha1 { false } + pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool { + let Some(status) = self.status.as_ref() else { + return false; + }; + if status.conditions.is_empty() { + return false; + } + status + .conditions + .iter() + .any(|condition| condition.reason == "ReadyToPromote") + && &status.resources_hash == resources_hash + } + pub fn is_promoting(&self) -> bool { let Some(status) = self.status.as_ref() else { return false; diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index cb33b59c2ef76..e6dfcdd73ff41 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -418,36 +418,40 @@ impl k8s_controller::Context for Context { // replace_status, but this is fine because we already // extracted all of the information we want from the spec // earlier. - let mz = self - .update_status( - &mz_api, - mz, - MaterializeStatus { - active_generation, - // don't update the reconciliation id yet, - // because the rollout hasn't yet completed. if - // we fail later on, we want to ensure that the - // rollout gets retried. - last_completed_rollout_request: status.last_completed_rollout_request, - last_completed_rollout_environmentd_image_ref: status - .last_completed_rollout_environmentd_image_ref, - resource_id: status.resource_id, - resources_hash: String::new(), - conditions: vec![Condition { - type_: "UpToDate".into(), - status: "Unknown".into(), - last_transition_time: Time(chrono::offset::Utc::now()), - message: format!( - "Applying changes for generation {desired_generation}" - ), - observed_generation: mz.meta().generation, - reason: "Applying".into(), - }], - }, - active_generation != desired_generation, - ) - .await?; - let mz = &mz; + let mz = if mz.is_ready_to_promote(&resources_hash) { + mz + } else { + &self + .update_status( + &mz_api, + mz, + MaterializeStatus { + active_generation, + // don't update the reconciliation id yet, + // because the rollout hasn't yet completed. if + // we fail later on, we want to ensure that the + // rollout gets retried. + last_completed_rollout_request: status + .last_completed_rollout_request, + last_completed_rollout_environmentd_image_ref: status + .last_completed_rollout_environmentd_image_ref, + resource_id: status.resource_id, + resources_hash: String::new(), + conditions: vec![Condition { + type_: "UpToDate".into(), + status: "Unknown".into(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "Applying changes for generation {desired_generation}" + ), + observed_generation: mz.meta().generation, + reason: "Applying".into(), + }], + }, + active_generation != desired_generation, + ) + .await? + }; let status = mz.status(); if !mz.within_upgrade_window() { @@ -503,6 +507,39 @@ impl k8s_controller::Context for Context { Ok(Some(action)) } Ok(None) => { + if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote + && !mz.should_force_promote() + { + trace!( + "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy." + ); + self.update_status( + &mz_api, + mz, + MaterializeStatus { + active_generation, + last_completed_rollout_request: status + .last_completed_rollout_request, + last_completed_rollout_environmentd_image_ref: status + .last_completed_rollout_environmentd_image_ref, + resource_id: status.resource_id, + resources_hash, + conditions: vec![Condition { + type_: "UpToDate".into(), + status: "Unknown".into(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "Ready to promote generation {desired_generation}" + ), + observed_generation: mz.meta().generation, + reason: "ReadyToPromote".into(), + }], + }, + active_generation != desired_generation, + ) + .await?; + return Ok(None); + } // do this last, so that we keep traffic pointing at // the previous environmentd until the new one is // fully ready diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index 1377c4d1dffc5..0950c70ff1711 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -1256,6 +1256,27 @@ def validate(self, mods: dict[type[Modification], Any]) -> None: os.killpg(os.getpgid(process.pid), signal.SIGTERM) +class RolloutStrategy(Modification): + @classmethod + def values(cls, version: MzVersion) -> list[Any]: + return [ + "WaitUntilReady", + "ManuallyPromote", + "ImmediatelyPromoteCausingDowntime", + ] + + @classmethod + def default(cls) -> Any: + return "WaitUntilReady" + + def modify(self, definition: dict[str, Any]) -> None: + definition["materialize"]["spec"]["rolloutStrategy"] = self.value + + def validate(self, mods: dict[type[Modification], Any]) -> None: + # This is validated in post_run_check + return + + class Properties(Enum): Defaults = "defaults" Individual = "individual" @@ -2233,9 +2254,97 @@ def run(definition: dict[str, Any], expect_fail: bool) -> None: except subprocess.CalledProcessError as e: print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") raise + + if definition["materialize"]["spec"].get("rolloutStrategy") == "ManuallyPromote": + # First wait for it to become ready to promote, but not yet promoted + for _ in range(900): + time.sleep(1) + if is_ready_to_manually_promote(): + break + else: + spawn.runv( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "yaml", + ], + ) + raise RuntimeError("Never became ready for manual promotion") + + # Wait to see that it doesn't promote + time.sleep(30) + if not is_ready_to_manually_promote(): + spawn.runv( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "yaml", + ], + ) + raise RuntimeError( + "Stopped being ready for manual promotion before promoting" + ) + + # Manually promote it + mz = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "json", + ], + stderr=subprocess.DEVNULL, + ) + )["items"][0] + definition["materialize"]["spec"]["forcePromote"] = mz["spec"]["requestRollout"] + try: + spawn.runv( + ["kubectl", "apply", "-f", "-"], + stdin=yaml.dump(definition["materialize"]).encode(), + ) + except subprocess.CalledProcessError as e: + print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}") + raise + post_run_check(definition, expect_fail) +def is_ready_to_manually_promote(): + data = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "materializes", + "-n", + "materialize-environment", + "-o", + "json", + ], + stderr=subprocess.DEVNULL, + ) + ) + conditions = data["items"][0].get("status", {}).get("conditions") + return ( + conditions is not None + and conditions[0]["type"] == "UpToDate" + and conditions[0]["status"] == "Unknown" + and conditions[0]["reason"] == "ReadyToPromote" + ) + + def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: for i in range(900): time.sleep(1) @@ -2262,10 +2371,9 @@ def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None: if ( not status["conditions"] or status["conditions"][0]["type"] != "UpToDate" + or status["conditions"][0]["status"] != "True" ): continue - if status["conditions"][0]["status"] != "True": - continue if ( status["lastCompletedRolloutRequest"] == data["items"][0]["spec"]["requestRollout"]