From 0f3ce93587a74211ecc455badd68734e323e0e71 Mon Sep 17 00:00:00 2001
From: Alexander Lyon
Date: Mon, 4 Sep 2023 11:30:36 +0100
Subject: [PATCH 1/5] feat: add basic support for multi-cluster replication
---
README.md | 2 +-
api/v1beta1/temporalcluster_types.go | 17 +++++
api/v1beta1/zz_generated.deepcopy.go | 20 +++++
.../temporal.io_temporalclusters.yaml | 11 +++
.../bases/temporal.io_temporalclusters.yaml | 11 +++
docs/api/v1beta1.md | 75 +++++++++++++++++++
docs/features/multi-cluster-replication.md | 64 ++++++++++++++++
internal/resource/config/configmap_builder.go | 40 ++++++----
8 files changed, 226 insertions(+), 14 deletions(-)
create mode 100644 docs/features/multi-cluster-replication.md
diff --git a/README.md b/README.md
index 16052645..35bd18a2 100644
--- a/README.md
+++ b/README.md
@@ -93,8 +93,8 @@ Please note this table only reports end-to-end tests suite coverage, others vers
- [x] Cluster monitoring.
- [x] Complete end2end test suite.
- [x] Archival.
+- [x] Multi cluster replication.
- [ ] Auto scaling.
-- [ ] Multi cluster replication.
## Contributing
diff --git a/api/v1beta1/temporalcluster_types.go b/api/v1beta1/temporalcluster_types.go
index 010aec4b..a0322ff1 100644
--- a/api/v1beta1/temporalcluster_types.go
+++ b/api/v1beta1/temporalcluster_types.go
@@ -854,6 +854,20 @@ func (s *ClusterArchivalSpec) IsEnabled() bool {
return s != nil && s.Enabled
}
+// ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
+// the operator will comfigure the cluster to replicate data to other clusters. If this object is defined,
+// then both properties below must be set.
+type ClusterReplicationSpec struct {
+ // EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
+ // Namespaces that are marked global are replicated across nodes in the cluster.
+ // +kubebuilder:validation:Required
+ EnableGlobalNamespace bool `json:"enableGlobalNamespace"`
+ // InitialFailoverVersion is used to determine the leadership order between nodes in the cluster. The node with the
+ // lowest initial failover version will be the primary. This must be unique across the cluster.
+ // +kubebuilder:validation:Required
+ InitialFailoverVersion int64 `json:"initialFailoverVersion"`
+}
+
type ArchivalProviderKind string
const (
@@ -1055,6 +1069,9 @@ type TemporalClusterSpec struct {
// Authorization allows authorization configuration for the temporal cluster.
// +optional
Authorization *AuthorizationSpec `json:"authorization,omitempty"`
+ // Replication allows configuration of multi-cluster replication.
+ // +optional
+ Replication *ClusterReplicationSpec `json:"replication,omitempty"`
}
// ServiceStatus reports a service status.
diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go
index d42b3b6c..d26be269 100644
--- a/api/v1beta1/zz_generated.deepcopy.go
+++ b/api/v1beta1/zz_generated.deepcopy.go
@@ -244,6 +244,21 @@ func (in *ClusterArchivalSpec) DeepCopy() *ClusterArchivalSpec {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *ClusterReplicationSpec) DeepCopyInto(out *ClusterReplicationSpec) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterReplicationSpec.
+func (in *ClusterReplicationSpec) DeepCopy() *ClusterReplicationSpec {
+ if in == nil {
+ return nil
+ }
+ out := new(ClusterReplicationSpec)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConstrainedValue) DeepCopyInto(out *ConstrainedValue) {
*out = *in
@@ -1688,6 +1703,11 @@ func (in *TemporalClusterSpec) DeepCopyInto(out *TemporalClusterSpec) {
*out = new(AuthorizationSpec)
(*in).DeepCopyInto(*out)
}
+ if in.Replication != nil {
+ in, out := &in.Replication, &out.Replication
+ *out = new(ClusterReplicationSpec)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalClusterSpec.
diff --git a/bundle/manifests/temporal.io_temporalclusters.yaml b/bundle/manifests/temporal.io_temporalclusters.yaml
index 9c9baf2f..34f8092f 100644
--- a/bundle/manifests/temporal.io_temporalclusters.yaml
+++ b/bundle/manifests/temporal.io_temporalclusters.yaml
@@ -2941,6 +2941,17 @@ spec:
- defaultStore
- visibilityStore
type: object
+ replication:
+ properties:
+ enableGlobalNamespace:
+ type: boolean
+ initialFailoverVersion:
+ format: int64
+ type: integer
+ required:
+ - enableGlobalNamespace
+ - initialFailoverVersion
+ type: object
services:
description: Services allows customizations for each temporal services
deployment.
diff --git a/config/crd/bases/temporal.io_temporalclusters.yaml b/config/crd/bases/temporal.io_temporalclusters.yaml
index d14fc287..79d66fe2 100644
--- a/config/crd/bases/temporal.io_temporalclusters.yaml
+++ b/config/crd/bases/temporal.io_temporalclusters.yaml
@@ -2692,6 +2692,17 @@ spec:
- defaultStore
- visibilityStore
type: object
+ replication:
+ properties:
+ enableGlobalNamespace:
+ type: boolean
+ initialFailoverVersion:
+ format: int64
+ type: integer
+ required:
+ - enableGlobalNamespace
+ - initialFailoverVersion
+ type: object
services:
description: Services allows customizations for each temporal services deployment.
properties:
diff --git a/docs/api/v1beta1.md b/docs/api/v1beta1.md
index 50822bd3..ea9017ab 100644
--- a/docs/api/v1beta1.md
+++ b/docs/api/v1beta1.md
@@ -294,6 +294,20 @@ AuthorizationSpec
Authorization allows authorization configuration for the temporal cluster.
+
+
+replication
+
+
+ClusterReplicationSpec
+
+
+ |
+
+(Optional)
+ Replication allows configuration of multi-cluster replication.
+ |
+
@@ -923,6 +937,53 @@ ArchivalSpec
+ClusterReplicationSpec
+
+
+(Appears on:
+TemporalClusterSpec)
+
+ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
+the operator will comfigure the cluster to replicate data to other clusters. If this object is defined,
+then both properties below must be set.
+
ConstrainedValue
@@ -5005,6 +5066,20 @@ AuthorizationSpec
Authorization allows authorization configuration for the temporal cluster.
+
+
+replication
+
+
+ClusterReplicationSpec
+
+
+ |
+
+(Optional)
+ Replication allows configuration of multi-cluster replication.
+ |
+
diff --git a/docs/features/multi-cluster-replication.md b/docs/features/multi-cluster-replication.md
new file mode 100644
index 00000000..721129cd
--- /dev/null
+++ b/docs/features/multi-cluster-replication.md
@@ -0,0 +1,64 @@
+# Multi-cluster replication
+
+Temporal supports multi-cluster replication. This feature allows you to replicate specific temporal namespaces to a different temporal cluster. This is useful for disaster recovery, or to have a temporal cluster in a different region for latency reasons, or if you want to upgrade the temporal history shard count.
+
+## How it works
+
+To set up multi-cluster replication using the temporal operator, you must first enable global namespaces on the clusters you wish to support, and then assign them a unique failover version.
+This can be configured via the `spec.replicaton` of the `TemporalCluster` resource. Temporal operator automatically configures the remaining fields, and currently hard-codes the failover
+increment to 100. If a cluster fails, the remaining clusters will elect a new primary cluster based with the lowest failover version. The original cluster, if it comes back online, will
+be assigned a new failover version, which is always the lowest multiple of the failover increment (+ initialFailoverVersion) that is greater than the leader cluster's failover version.
+
+```yaml
+apiVersion: temporal.io/v1beta1
+kind: TemporalCluster
+metadata:
+ name: prod
+spec:
+ replication:
+ enableGlobalNamespace: true
+ initialFailoverVersion: 1
+```
+
+## Starting replication
+
+Once the two clusters are configured, simply set up connections between them using the temporal CLI. In the future, there may be a way to do this via the operator.
+
+```bash
+# port forward to the frontend of the primary cluster
+kubectl port-forward primary-frontend 7233:7233
+
+temporal operator cluster upsert --frontend-address secondary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
+
+# port forward to the frontend of the secondary cluster
+kubectl port-forward secondary-frontend 7233:7233
+
+tepmoral operator cluster upsert --frontend-address primary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
+```
+
+## Replicating namespaces
+
+Once the clusters are connected, you can replicate namespaces between them. This can be done via the temporal CLI, or via the operator. Simply create a new namespace resource for just the primary clusterRef, and add the secondary cluster to the list of clusters. The namespace will be added to the secondary cluster automatically, with all the same settings, and start receiving updates.
+
+```yaml
+apiVersion: temporal.io/v1beta1
+kind: TemporalNamespace
+metadata:
+ name: primary-namespace
+spec:
+ clusterRef:
+ name: primary
+ clusters:
+ - primary
+ - secondary
+ activeClusterName: secondary
+ isGlobalNamespace: true
+```
+
+| **🚨 Note**: Replication does not eagerly push records to the other cluster. If you set up replication with existing workflows on the primary node, make sure to query every workflow on the original node so that they are propagated to prevent data loss.
+
+## A mechanism for increasing the history shard count
+
+Since temporal 1.20, replicated clusters do not require the same number of history shards. This means it is a viable method for migrating a cluster that has outgrown its shard count. To do this, simply have your secondary cluster use a higher shard count than the primary cluster. The only requirement is that the shard count on the secondary cluster is an even multiple of the first. So if you have 512 shards on the primary cluster, you can have 1024 shards (or any other multiple) on the secondary cluster, but not 1023.
+
+When replication is complete, simply take down the old cluster, and flip your clients over to the new cluster. This can all be achieved with very little downtime.
diff --git a/internal/resource/config/configmap_builder.go b/internal/resource/config/configmap_builder.go
index 1be97a63..7649a7bf 100644
--- a/internal/resource/config/configmap_builder.go
+++ b/internal/resource/config/configmap_builder.go
@@ -199,6 +199,30 @@ func (b *ConfigmapBuilder) buildArchivalConfig() (*config.Archival, *config.Arch
return cfg, namespaceDefaults
}
+func (b *ConfigmapBuilder) buildClusterMetadataConfig() *cluster.Config {
+ failoverVersion := int64(1)
+ enableGlobalNamespace := false
+
+ if b.instance.Spec.Replication != nil {
+ failoverVersion = b.instance.Spec.Replication.InitialFailoverVersion
+ enableGlobalNamespace = b.instance.Spec.Replication.EnableGlobalNamespace
+ }
+
+ return &cluster.Config{
+ EnableGlobalNamespace: enableGlobalNamespace,
+ FailoverVersionIncrement: 10,
+ MasterClusterName: b.instance.Name,
+ CurrentClusterName: b.instance.Name,
+ ClusterInformation: map[string]cluster.ClusterInformation{
+ b.instance.Name: {
+ Enabled: true,
+ InitialFailoverVersion: failoverVersion,
+ RPCAddress: "127.0.0.1:7233",
+ },
+ },
+ }
+}
+
func (b *ConfigmapBuilder) Update(object client.Object) error {
configMap := object.(*corev1.ConfigMap)
@@ -209,6 +233,8 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {
archivalConfig, archivalNamespaceDefaults := b.buildArchivalConfig()
+ clusterMetadata := b.buildClusterMetadataConfig()
+
temporalCfg := config.Config{
Global: config.Global{
Membership: config.Membership{
@@ -223,19 +249,7 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {
NamespaceDefaults: config.NamespaceDefaults{
Archival: *archivalNamespaceDefaults,
},
- ClusterMetadata: &cluster.Config{
- EnableGlobalNamespace: false,
- FailoverVersionIncrement: 10,
- MasterClusterName: b.instance.Name,
- CurrentClusterName: b.instance.Name,
- ClusterInformation: map[string]cluster.ClusterInformation{
- b.instance.Name: {
- Enabled: true,
- InitialFailoverVersion: 1,
- RPCAddress: "127.0.0.1:7233",
- },
- },
- },
+ ClusterMetadata: clusterMetadata,
Services: map[string]config.Service{
string(primitives.FrontendService): {
RPC: config.RPC{
From b22f6aa5c08c76513f097da9b814b9074e2a98ce Mon Sep 17 00:00:00 2001
From: Alexander Lyon
Date: Thu, 14 Mar 2024 11:13:52 +0000
Subject: [PATCH 2/5] Update multi-cluster-replication.md
---
docs/features/multi-cluster-replication.md | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/docs/features/multi-cluster-replication.md b/docs/features/multi-cluster-replication.md
index 721129cd..c18316b2 100644
--- a/docs/features/multi-cluster-replication.md
+++ b/docs/features/multi-cluster-replication.md
@@ -6,7 +6,7 @@ Temporal supports multi-cluster replication. This feature allows you to replicat
To set up multi-cluster replication using the temporal operator, you must first enable global namespaces on the clusters you wish to support, and then assign them a unique failover version.
This can be configured via the `spec.replicaton` of the `TemporalCluster` resource. Temporal operator automatically configures the remaining fields, and currently hard-codes the failover
-increment to 100. If a cluster fails, the remaining clusters will elect a new primary cluster based with the lowest failover version. The original cluster, if it comes back online, will
+increment to 10, meaning you can have at most one leader and 9 followers. If a cluster fails, the remaining clusters will elect a new primary cluster based with the lowest failover version. The original cluster, if it comes back online, will
be assigned a new failover version, which is always the lowest multiple of the failover increment (+ initialFailoverVersion) that is greater than the leader cluster's failover version.
```yaml
@@ -20,6 +20,8 @@ spec:
initialFailoverVersion: 1
```
+For example, in a setup with a leader with `initialFailoverVersion` 1, and a follower with `initialFailoverVersion` 2, since the increment is set to 10 a failure in the leader will flip control to the follower, and increment the leader's failover version to 11.
+
## Starting replication
Once the two clusters are configured, simply set up connections between them using the temporal CLI. In the future, there may be a way to do this via the operator.
@@ -55,7 +57,7 @@ spec:
isGlobalNamespace: true
```
-| **🚨 Note**: Replication does not eagerly push records to the other cluster. If you set up replication with existing workflows on the primary node, make sure to query every workflow on the original node so that they are propagated to prevent data loss.
+| **🚨 Note**: Replication happens at workflow evaluation time, meaning that adding replication to a running cluster requires special care. If you set up replication with existing workflows on the primary node, make sure to query every workflow on the original node so that they are propagated or risk data loss.
## A mechanism for increasing the history shard count
From 5b620a875cf5e56f3c703ac877843fe9c02f86d5 Mon Sep 17 00:00:00 2001
From: Alexander Lyon
Date: Thu, 25 Jul 2024 10:54:23 +0100
Subject: [PATCH 3/5] Apply suggestions from code review
Co-authored-by: Alexandre Vilain
---
api/v1beta1/temporalcluster_types.go | 2 +-
docs/features/multi-cluster-replication.md | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/api/v1beta1/temporalcluster_types.go b/api/v1beta1/temporalcluster_types.go
index a0322ff1..200b87f4 100644
--- a/api/v1beta1/temporalcluster_types.go
+++ b/api/v1beta1/temporalcluster_types.go
@@ -855,7 +855,7 @@ func (s *ClusterArchivalSpec) IsEnabled() bool {
}
// ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
-// the operator will comfigure the cluster to replicate data to other clusters. If this object is defined,
+// the operator will configure the cluster to replicate data to other clusters. If this object is defined,
// then both properties below must be set.
type ClusterReplicationSpec struct {
// EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
diff --git a/docs/features/multi-cluster-replication.md b/docs/features/multi-cluster-replication.md
index c18316b2..52f39258 100644
--- a/docs/features/multi-cluster-replication.md
+++ b/docs/features/multi-cluster-replication.md
@@ -35,7 +35,7 @@ temporal operator cluster upsert --frontend-address secondary-cluster.namespace.
# port forward to the frontend of the secondary cluster
kubectl port-forward secondary-frontend 7233:7233
-tepmoral operator cluster upsert --frontend-address primary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
+temporal operator cluster upsert --frontend-address primary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
```
## Replicating namespaces
@@ -57,7 +57,7 @@ spec:
isGlobalNamespace: true
```
-| **🚨 Note**: Replication happens at workflow evaluation time, meaning that adding replication to a running cluster requires special care. If you set up replication with existing workflows on the primary node, make sure to query every workflow on the original node so that they are propagated or risk data loss.
+| **🚨 Note**: Enabling replication will not automatically replicate old workflows. It only replicates workflows as they are interacted with. For cases like trying to increase the shard count, this is important as you need to make sure each workflow has been evaluated at least once after replication has been set up.
## A mechanism for increasing the history shard count
From 6c35eff37d17cb53113d367773eb8f30fd4108b0 Mon Sep 17 00:00:00 2001
From: Alexander Lyon
Date: Thu, 25 Jul 2024 11:03:11 +0100
Subject: [PATCH 4/5] run `make manifests`
---
config/crd/bases/temporal.io_temporalclusters.yaml | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/config/crd/bases/temporal.io_temporalclusters.yaml b/config/crd/bases/temporal.io_temporalclusters.yaml
index 79d66fe2..84bbeaaa 100644
--- a/config/crd/bases/temporal.io_temporalclusters.yaml
+++ b/config/crd/bases/temporal.io_temporalclusters.yaml
@@ -2693,10 +2693,17 @@ spec:
- visibilityStore
type: object
replication:
+ description: Replication allows configuration of multi-cluster replication.
properties:
enableGlobalNamespace:
+ description: |-
+ EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
+ Namespaces that are marked global are replicated across nodes in the cluster.
type: boolean
initialFailoverVersion:
+ description: |-
+ InitialFailoverVersion is used to determine the leadership order between nodes in the cluster. The node with the
+ lowest initial failover version will be the primary. This must be unique across the cluster.
format: int64
type: integer
required:
From 4892e879cc0d81d8e8c197b03b083d7b79fbe5dc Mon Sep 17 00:00:00 2001
From: Alexander Lyon
Date: Thu, 25 Jul 2024 11:06:21 +0100
Subject: [PATCH 5/5] run `make generate`
---
api/v1beta1/zz_generated.deepcopy.go | 10 ++++++----
docs/api/v1beta1.md | 2 +-
pkg/version/zz_generated.deepcopy.go | 1 -
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go
index d26be269..4b65b7e6 100644
--- a/api/v1beta1/zz_generated.deepcopy.go
+++ b/api/v1beta1/zz_generated.deepcopy.go
@@ -1,5 +1,4 @@
//go:build !ignore_autogenerated
-// +build !ignore_autogenerated
// Licensed to Alexandre VILAIN under one or more contributor
// license agreements. See the NOTICE file distributed with
@@ -446,7 +445,8 @@ func (in *DynamicConfigSpec) DeepCopyInto(out *DynamicConfigSpec) {
if val == nil {
(*out)[key] = nil
} else {
- in, out := &val, &outVal
+ inVal := (*in)[key]
+ in, out := &inVal, &outVal
*out = make([]ConstrainedValue, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
@@ -656,7 +656,8 @@ func (in *MetricsSpec) DeepCopyInto(out *MetricsSpec) {
if val == nil {
(*out)[key] = nil
} else {
- in, out := &val, &outVal
+ inVal := (*in)[key]
+ in, out := &inVal, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
@@ -671,7 +672,8 @@ func (in *MetricsSpec) DeepCopyInto(out *MetricsSpec) {
if val == nil {
(*out)[key] = nil
} else {
- in, out := &val, &outVal
+ inVal := (*in)[key]
+ in, out := &inVal, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
diff --git a/docs/api/v1beta1.md b/docs/api/v1beta1.md
index ea9017ab..022277fa 100644
--- a/docs/api/v1beta1.md
+++ b/docs/api/v1beta1.md
@@ -944,7 +944,7 @@ ArchivalSpec
TemporalClusterSpec)
ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
-the operator will comfigure the cluster to replicate data to other clusters. If this object is defined,
+the operator will configure the cluster to replicate data to other clusters. If this object is defined,
then both properties below must be set.