Skip to content

Commit 55a9398

Browse files
Manual promotion support
1 parent e490cfd commit 55a9398

File tree

3 files changed

+204
-30
lines changed

3 files changed

+204
-30
lines changed

src/cloud-resources/src/crd/materialize.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,23 @@ pub mod v1alpha1 {
6666
#[default]
6767
WaitUntilReady,
6868

69+
/// Create a new generation of pods, leaving the old generation as the serving generation
70+
/// until the user manually promotes the new generation.
71+
///
72+
/// Users can promote the new generation at any time, even if the new generation pods are
73+
/// not fully caught up, by setting `forcePromote` to the same value as `requestRollout` in
74+
/// the Materialize spec.
75+
///
76+
/// {{<warning>}}
77+
/// Do not leave new generations unpromoted indefinitely.
78+
///
79+
/// The new generation keeps open read holds which prevent compaction. Once promoted or
80+
/// cancelled, those read holds are released. If left unpromoted for an extended time, this
81+
/// data can build up, and can cause extreme deletion load on the metadata backend database
82+
/// when finally promoted or cancelled.
83+
/// {{</warning>}}
84+
ManuallyPromote,
85+
6986
/// {{<warning>}}
7087
/// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
7188
///
@@ -429,6 +446,20 @@ pub mod v1alpha1 {
429446
false
430447
}
431448

449+
pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
450+
let Some(status) = self.status.as_ref() else {
451+
return false;
452+
};
453+
if status.conditions.is_empty() {
454+
return false;
455+
}
456+
status
457+
.conditions
458+
.iter()
459+
.any(|condition| condition.reason == "ReadyToPromote")
460+
&& &status.resources_hash == resources_hash
461+
}
462+
432463
pub fn is_promoting(&self) -> bool {
433464
let Some(status) = self.status.as_ref() else {
434465
return false;

src/orchestratord/src/controller/materialize.rs

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -604,34 +604,38 @@ impl k8s_controller::Context for Context {
604604
// replace_status, but this is fine because we already
605605
// extracted all of the information we want from the spec
606606
// earlier.
607-
let mz = self
608-
.update_status(
609-
&mz_api,
610-
mz,
611-
MaterializeStatus {
612-
active_generation,
613-
// don't update the reconciliation id yet,
614-
// because the rollout hasn't yet completed. if
615-
// we fail later on, we want to ensure that the
616-
// rollout gets retried.
617-
last_completed_rollout_request: status.last_completed_rollout_request,
618-
resource_id: status.resource_id,
619-
resources_hash: String::new(),
620-
conditions: vec![Condition {
621-
type_: "UpToDate".into(),
622-
status: "Unknown".into(),
623-
last_transition_time: Time(chrono::offset::Utc::now()),
624-
message: format!(
625-
"Applying changes for generation {desired_generation}"
626-
),
627-
observed_generation: mz.meta().generation,
628-
reason: "Applying".into(),
629-
}],
630-
},
631-
active_generation != desired_generation,
632-
)
633-
.await?;
634-
let mz = &mz;
607+
let mz = if mz.is_ready_to_promote(&resources_hash) {
608+
mz
609+
} else {
610+
&self
611+
.update_status(
612+
&mz_api,
613+
mz,
614+
MaterializeStatus {
615+
active_generation,
616+
// don't update the reconciliation id yet,
617+
// because the rollout hasn't yet completed. if
618+
// we fail later on, we want to ensure that the
619+
// rollout gets retried.
620+
last_completed_rollout_request: status
621+
.last_completed_rollout_request,
622+
resource_id: status.resource_id,
623+
resources_hash: String::new(),
624+
conditions: vec![Condition {
625+
type_: "UpToDate".into(),
626+
status: "Unknown".into(),
627+
last_transition_time: Time(chrono::offset::Utc::now()),
628+
message: format!(
629+
"Applying changes for generation {desired_generation}"
630+
),
631+
observed_generation: mz.meta().generation,
632+
reason: "Applying".into(),
633+
}],
634+
},
635+
active_generation != desired_generation,
636+
)
637+
.await?
638+
};
635639
let status = mz.status();
636640

637641
if mz.spec.rollout_strategy
@@ -655,6 +659,37 @@ impl k8s_controller::Context for Context {
655659
Ok(Some(action))
656660
}
657661
Ok(None) => {
662+
if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
663+
&& !mz.should_force_promote()
664+
{
665+
trace!(
666+
"Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
667+
);
668+
self.update_status(
669+
&mz_api,
670+
mz,
671+
MaterializeStatus {
672+
active_generation,
673+
last_completed_rollout_request: status
674+
.last_completed_rollout_request,
675+
resource_id: status.resource_id,
676+
resources_hash,
677+
conditions: vec![Condition {
678+
type_: "UpToDate".into(),
679+
status: "Unknown".into(),
680+
last_transition_time: Time(chrono::offset::Utc::now()),
681+
message: format!(
682+
"Ready to promote generation {desired_generation}"
683+
),
684+
observed_generation: mz.meta().generation,
685+
reason: "ReadyToPromote".into(),
686+
}],
687+
},
688+
active_generation != desired_generation,
689+
)
690+
.await?;
691+
return Ok(None);
692+
}
658693
// do this last, so that we keep traffic pointing at
659694
// the previous environmentd until the new one is
660695
// fully ready

test/orchestratord/mzcompose.py

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,27 @@ def validate(self, mods: dict[type[Modification], Any]) -> None:
12461246
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
12471247

12481248

1249+
class RolloutStrategy(Modification):
1250+
@classmethod
1251+
def values(cls, version: MzVersion) -> list[Any]:
1252+
return [
1253+
"WaitUntilReady",
1254+
"ManuallyPromote",
1255+
"ImmediatelyPromoteCausingDowntime",
1256+
]
1257+
1258+
@classmethod
1259+
def default(cls) -> Any:
1260+
return "WaitUntilReady"
1261+
1262+
def modify(self, definition: dict[str, Any]) -> None:
1263+
definition["materialize"]["spec"]["rolloutStrategy"] = self.value
1264+
1265+
def validate(self, mods: dict[type[Modification], Any]) -> None:
1266+
# This is validated in post_run_check
1267+
return
1268+
1269+
12491270
class Properties(Enum):
12501271
Defaults = "defaults"
12511272
Individual = "individual"
@@ -2045,9 +2066,97 @@ def run(definition: dict[str, Any], expect_fail: bool) -> None:
20452066
except subprocess.CalledProcessError as e:
20462067
print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}")
20472068
raise
2069+
2070+
if definition["materialize"]["spec"]["rolloutStrategy"] == "ManuallyPromote":
2071+
# First wait for it to become ready to promote, but not yet promoted
2072+
for _ in range(900):
2073+
time.sleep(1)
2074+
if is_ready_to_manually_promote():
2075+
break
2076+
else:
2077+
spawn.runv(
2078+
[
2079+
"kubectl",
2080+
"get",
2081+
"materializes",
2082+
"-n",
2083+
"materialize-environment",
2084+
"-o",
2085+
"yaml",
2086+
],
2087+
)
2088+
raise RuntimeError("Never became ready for manual promotion")
2089+
2090+
# Wait to see that it doesn't promote
2091+
time.sleep(30)
2092+
if not is_ready_to_manually_promote():
2093+
spawn.runv(
2094+
[
2095+
"kubectl",
2096+
"get",
2097+
"materializes",
2098+
"-n",
2099+
"materialize-environment",
2100+
"-o",
2101+
"yaml",
2102+
],
2103+
)
2104+
raise RuntimeError(
2105+
"Stopped being ready for manual promotion before promoting"
2106+
)
2107+
2108+
# Manually promote it
2109+
mz = json.loads(
2110+
spawn.capture(
2111+
[
2112+
"kubectl",
2113+
"get",
2114+
"materializes",
2115+
"-n",
2116+
"materialize-environment",
2117+
"-o",
2118+
"json",
2119+
],
2120+
stderr=subprocess.DEVNULL,
2121+
)
2122+
)["items"][0]
2123+
definition["materialize"]["spec"]["forcePromote"] = mz["spec"]["requestRollout"]
2124+
try:
2125+
spawn.runv(
2126+
["kubectl", "apply", "-f", "-"],
2127+
stdin=yaml.dump(definition["materialize"]).encode(),
2128+
)
2129+
except subprocess.CalledProcessError as e:
2130+
print(f"Failed to apply: {e.stdout}\nSTDERR:{e.stderr}")
2131+
raise
2132+
20482133
post_run_check(definition, expect_fail)
20492134

20502135

2136+
def is_ready_to_manually_promote():
2137+
data = json.loads(
2138+
spawn.capture(
2139+
[
2140+
"kubectl",
2141+
"get",
2142+
"materializes",
2143+
"-n",
2144+
"materialize-environment",
2145+
"-o",
2146+
"json",
2147+
],
2148+
stderr=subprocess.DEVNULL,
2149+
)
2150+
)
2151+
conditions = data["items"][0].get("status", {}).get("conditions")
2152+
return (
2153+
conditions is not None
2154+
and conditions[0]["type"] == "UpToDate"
2155+
and conditions[0]["status"] == "Unknown"
2156+
and conditions[0]["reason"] == "ReadyToPromote"
2157+
)
2158+
2159+
20512160
def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None:
20522161
for i in range(900):
20532162
time.sleep(1)
@@ -2074,10 +2183,9 @@ def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None:
20742183
if (
20752184
not status["conditions"]
20762185
or status["conditions"][0]["type"] != "UpToDate"
2186+
or status["conditions"][0]["status"] != "True"
20772187
):
20782188
continue
2079-
if status["conditions"][0]["status"] != "True":
2080-
continue
20812189
if (
20822190
status["lastCompletedRolloutRequest"]
20832191
== data["items"][0]["spec"]["requestRollout"]

0 commit comments

Comments
 (0)