diff --git a/README.md b/README.md index 1e9c7516..9750805a 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ Please note this table only reports end-to-end tests suite coverage, others vers - [x] Automatic mTLS certificates management (using cert-manager). - [x] Support for integration in meshes: istio & linkerd. - [x] Namespace management using CRDs. +- [x] Custom search attribute management. - [x] Cluster version upgrades. - [x] Cluster monitoring. - [x] Complete end2end test suite. diff --git a/api/v1beta1/temporalnamespace_types.go b/api/v1beta1/temporalnamespace_types.go index 1ab2d45a..9b57a372 100644 --- a/api/v1beta1/temporalnamespace_types.go +++ b/api/v1beta1/temporalnamespace_types.go @@ -55,6 +55,11 @@ type TemporalNamespaceSpec struct { // Only applicable if the namespace is a global namespace. // +optional Clusters []string `json:"clusters,omitempty"` + // Search attributes are key-value pairs of metadata objects included in a workflow + // execution's visibility information. Temporal uses some default search attributes + // but also supports custom search attributes. + // +optional + CustomSearchAttributes map[string]string `json:"customSearchAttributes,omitempty"` // The name of active Temporal Cluster. // Only applicable if the namespace is a global namespace. // +optional diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 4ae28587..53a484a0 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -1835,6 +1835,13 @@ func (in *TemporalNamespaceSpec) DeepCopyInto(out *TemporalNamespaceSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.CustomSearchAttributes != nil { + in, out := &in.CustomSearchAttributes, &out.CustomSearchAttributes + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.Archival != nil { in, out := &in.Archival, &out.Archival *out = new(TemporalNamespaceArchivalSpec) diff --git a/charts/temporal-operator/crds/temporal-operator.crds.yaml b/charts/temporal-operator/crds/temporal-operator.crds.yaml index 662fc639..08a210e5 100644 --- a/charts/temporal-operator/crds/temporal-operator.crds.yaml +++ b/charts/temporal-operator/crds/temporal-operator.crds.yaml @@ -4479,6 +4479,14 @@ spec: items: type: string type: array + customSearchAttributes: + additionalProperties: + type: string + description: |- + Search attributes are key-value pairs of metadata objects included in a workflow + execution's visibility information. Temporal uses some default search attributes + but also supports custom search attributes. + type: object data: additionalProperties: type: string diff --git a/config/crd/bases/temporal.io_temporalnamespaces.yaml b/config/crd/bases/temporal.io_temporalnamespaces.yaml index 37ec3aaa..ebbdfb0f 100644 --- a/config/crd/bases/temporal.io_temporalnamespaces.yaml +++ b/config/crd/bases/temporal.io_temporalnamespaces.yaml @@ -130,6 +130,14 @@ spec: items: type: string type: array + customSearchAttributes: + additionalProperties: + type: string + description: |- + Search attributes are key-value pairs of metadata objects included in a workflow + execution's visibility information. Temporal uses some default search attributes + but also supports custom search attributes. + type: object data: additionalProperties: type: string diff --git a/controllers/temporalnamespace_controller.go b/controllers/temporalnamespace_controller.go index cfae9fd2..0fe53692 100644 --- a/controllers/temporalnamespace_controller.go +++ b/controllers/temporalnamespace_controller.go @@ -21,9 +21,13 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/alexandrevilain/controller-tools/pkg/patch" + "github.com/go-logr/logr" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,16 +62,19 @@ type TemporalNamespaceReconciler struct { func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { logger := log.FromContext(ctx) - logger.Info("Starting reconciliation") + logger.Info("=== (NS) STARTING RECONCILIATION") + // logger.Info("Starting reconciliation") namespace := &v1beta1.TemporalNamespace{} err := r.Get(ctx, req.NamespacedName, namespace) if err != nil { if apierrors.IsNotFound(err) { + logger.Info("=== (NS) COULD NOT GET NAMESPACE") return reconcile.Result{}, nil } return reconcile.Result{}, err } + logger.Info("=== (NS) GOT NAMESPACE") patchHelper, err := patch.NewHelper(namespace, r.Client) if err != nil { @@ -80,6 +87,7 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re if err != nil { reterr = kerrors.NewAggregate([]error{reterr, err}) } + logger.Info("=== (NS) PATCHING CLUSTER") }() cluster := &v1beta1.TemporalCluster{} @@ -90,20 +98,25 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re // - TemporalCluster has not been created yet. In this case, if the TemporalNamespace is deleted, no point in waiting for the TemporalCluster to be healthy. // - TemporalCluster existed at some point, but now is deleted. In this case, the underlying namespace in the Temporal server is already gone. controllerutil.RemoveFinalizer(namespace, deletionFinalizer) + logger.Info("=== (NS) ERROR GETTING CLUSTER FROM k8s") return reconcile.Result{}, nil } + logger.Info("=== (NS) ERROR GETTING CLUSTER FROM k8s") return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) } + logger.Info("=== (NS) GOT CLUSTER FROM k8s") if !cluster.IsReady() { - logger.Info("Skipping namespace reconciliation until referenced cluster is ready") + logger.Info("=== (NS) SKIPPING NAMESPACE RECONCILIATION UNTIL REFERENCED CLUSTER IS READY, REQUEUEING") + // logger.Info("Skipping namespace reconciliation until referenced cluster is ready") return reconcile.Result{RequeueAfter: 10 * time.Second}, nil } // Check if the resource has been marked for deletion if !namespace.ObjectMeta.DeletionTimestamp.IsZero() { - logger.Info("Deleting namespace") + logger.Info("=== (NS) DELETING NAMESPACE") + // logger.Info("Deleting namespace") err := r.ensureNamespaceDeleted(ctx, namespace, cluster) if err != nil { @@ -131,18 +144,167 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) } err = client.Update(ctx, temporal.NamespaceToUpdateNamespaceRequest(cluster, namespace)) + logger.Info("=== (NS) UPDATING NAMESPACE") if err != nil { return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) } } - logger.Info("Successfully reconciled namespace", "namespace", namespace.GetName()) + logger.Info("=== (CSA) BEFORE RECONCILING SEARCH ATTRIBUTES") + err = r.reconcileCustomSearchAttributes(ctx, &logger, namespace, cluster) + if err != nil { + logger.Info(fmt.Sprintf("=== (CSA) FAILED TO RECONCILE CUSTOM SEARCH ATTRIBUTES: %v", err)) + //logger.Info(fmt.Sprintf("Failed to reconcile custom search attributes: %v", err)) + return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) + } + logger.Info("=== (CSA) AFTER RECONCILING SEARCH ATTRIBUTES") + + logger.Info("=== (NS) SUCCESSFULLY RECONCILED NAMESPACE", "namespace", namespace.GetName()) + // logger.Info("Successfully reconciled namespace", "namespace", namespace.GetName()) v1beta1.SetTemporalNamespaceReady(namespace, metav1.ConditionTrue, v1beta1.TemporalNamespaceCreatedReason, "Namespace successfully created") return r.handleSuccess(namespace) } +// reconcileCustomSearchAttributes ensures that custom search attributes on the Temporal server exactly match those defined in the spec. +// +// NOTE: Reconciliation of custom search attributes is accomplished using these steps: +// +// 1. Retrieve the custom search attributes which are currently on the Temporal server. +// 2. Determine which custom search attributes need to be removed, if any. +// 3. Determine which custom search attributes need to be added, if any. +// 4. If needed, request the Temporal server to remove the necessary custom search attributes. +// 5. If needed, request the Temporal server to add the necessary custom search attributes. +// +// Some of these steps may fail if some Temporal search attribute constraint is violated; in which case this function will return early with a helpful error message. +func (r *TemporalNamespaceReconciler) reconcileCustomSearchAttributes(ctx context.Context, logger *logr.Logger, namespace *v1beta1.TemporalNamespace, cluster *v1beta1.TemporalCluster) error { + // Construct a client to talk to the Temporal server + client, err := temporal.GetClusterClient(ctx, r.Client, cluster) + if err != nil { + logger.Info("=== (CSA) COULDN'T CREATE CLIENT TO TALK TO TEMPORAL SERVER") + return err + } + logger.Info("=== (CSA) CREATED CLIENT TO TALK TO TEMPORAL SERVER") + // Requests to Temporal's OperatorService API need to specify the namespace name, so capture it here for future use. + ns := namespace.GetName() + + // List all search attributes that are currently on the Temporal server + listRequest := &operatorservice.ListSearchAttributesRequest{Namespace: ns} + logger.Info("=== (CSA) CONSTRUCTED LIST SEARCH ATTR REQUEST") + serverSearchAttributes, err := client.OperatorService().ListSearchAttributes(ctx, listRequest) + if err != nil { + logger.Info("=== (CSA) COULDN'T RECEIVE SEARCH ATTRIBUTES ON SERVER") + return err + } + logger.Info("=== (CSA) RECEIVED SEARCH ATTRIBUTES ON SERVER") + + // Narrow the focus to custom search attributes only. + serverCustomSearchAttributes := &serverSearchAttributes.CustomAttributes // use a pointer to avoid unnecessary copying + logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES ON SERVER: %v", serverCustomSearchAttributes)) + + // Note that the CustomSearchAttributes map data structure that is built using the Spec merely maps string->string. + // To rigorously compare search attributes between the spec and the Temporal server, the types need to be consistent. + // We therefore construct a string->enums.IndexedValueType map from the "weaker" string->string map. + specCustomSearchAttributes, err := createIndexedValueTypeMap(&namespace.Spec.CustomSearchAttributes) + if err != nil { + logger.Info(fmt.Sprintf("=== (CSA) ERROR READING CUSTOM SEARCH ATTRIBUTES IN SPEC: %v", specCustomSearchAttributes)) + return err + } + logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES IN SPEC: %v", specCustomSearchAttributes)) + + // Remove those custom search attributes from the Temporal server whose name does not exist in the Spec. + customSearchAttributesToRemove := make([]string, 0) + for serverSearchAttributeName := range *serverCustomSearchAttributes { + _, serverSearchAttributeNameExistsInSpec := specCustomSearchAttributes[serverSearchAttributeName] + if !serverSearchAttributeNameExistsInSpec { + customSearchAttributesToRemove = append(customSearchAttributesToRemove, serverSearchAttributeName) + } + } + logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES TO REMOVE: %v", customSearchAttributesToRemove)) + + // Add custom search attributes from the Spec which don't yet exist on the Temporal server. + // If the Temporal server already has a custom search attribute with the same name but a different type, then return an error. + customSearchAttributesToAdd := make(map[string]enums.IndexedValueType) + for specSearchAttributeName, specSearchAttributeType := range specCustomSearchAttributes { + serverSearchAttributeType, specSearchAttributeNameExistsOnServer := (*serverCustomSearchAttributes)[specSearchAttributeName] + if !specSearchAttributeNameExistsOnServer { + customSearchAttributesToAdd[specSearchAttributeName] = specSearchAttributeType + } else if specSearchAttributeType != serverSearchAttributeType { + logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTE %s ALREADY EXISTS AND HAS DIFFERENT TYPE %s", specSearchAttributeName, serverSearchAttributeType.String())) + return fmt.Errorf("search attribute %s already exists and has different type %s", specSearchAttributeName, serverSearchAttributeType.String()) + } + } + logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES TO ADD: %v", customSearchAttributesToAdd)) + + // If there are search attributes that should be removed, then make a request to the Temporal server to remove them. + if len(customSearchAttributesToRemove) > 0 { + removeRequest := &operatorservice.RemoveSearchAttributesRequest{ + Namespace: ns, + SearchAttributes: customSearchAttributesToRemove, + } + logger.Info("=== (CSA) CONSTRUCTED REMOVE ATTR REQUEST") + _, err = client.OperatorService().RemoveSearchAttributes(ctx, removeRequest) + logger.Info("=== (CSA) SERVER RESPONDED TO REMOVE REQUEST") + if err != nil { + return fmt.Errorf("failed to remove search attributes: %w", err) + } + logger.Info(fmt.Sprintf("=== (CSA) REMOVED CUSTOM SEARCH ATTRIBUTES: %v", customSearchAttributesToRemove)) + // logger.Info(fmt.Sprintf("removed custom search attributes: %v", customSearchAttributesToRemove)) + } else { + logger.Info(fmt.Sprintf("=== (CSA) NO SEARCH ATTRIBUTES TO REMOVE %v", customSearchAttributesToRemove)) + } + + // If there are search attributes that should be added, then make a request the Temporal server to create them. + if len(customSearchAttributesToAdd) > 0 { + addRequest := &operatorservice.AddSearchAttributesRequest{ + Namespace: ns, + SearchAttributes: customSearchAttributesToAdd, + } + logger.Info("=== (CSA) CONSTRUCTED ADD ATTR REQUEST") + _, err = client.OperatorService().AddSearchAttributes(ctx, addRequest) + logger.Info("=== (CSA) SERVER RESPONDED TO ADD REQUEST") + if err != nil { + return fmt.Errorf("failed to add search attributes: %w", err) + } + logger.Info(fmt.Sprintf("=== (CSA) ADDED CUSTOM SEARCH ATTRIBUTES: %v", customSearchAttributesToAdd)) + // logger.Info(fmt.Sprintf("added custom search attributes: %v", customSearchAttributesToAdd)) + } else { + logger.Info(fmt.Sprintf("=== (CSA) NO SEARCH ATTRIBUTES TO ADD %v", customSearchAttributesToAdd)) + } + + + logger.Info("=== (CSA) CUSTOM SEARCH ATTRIBUTE RECONCILIATION LOOP FINISHED") + return nil +} + +// createIndexedValueTypeMap is a helper function takes in a map[string]string and returns another +// map where each entry value from the old map is translated from a plain string into an enums.IndexedValueType. +func createIndexedValueTypeMap(m *map[string]string) (map[string]enums.IndexedValueType, error) { + specCustomSearchAttributes := make(map[string]enums.IndexedValueType, len(*m)) + for searchAttributeNameString, searchAttributeTypeString := range *m { + indexedValueType, err := searchAttributeTypeStringToEnum(searchAttributeTypeString) + if err != nil { + return nil, fmt.Errorf("failed to parse search attribute %s because its type is %s: %w", searchAttributeNameString, searchAttributeTypeString, err) + } + specCustomSearchAttributes[searchAttributeNameString] = indexedValueType + } + return specCustomSearchAttributes, nil +} + +// searchAttributeTypeStringToEnum retrieves the actual IndexedValueType for a given string. +// It expects searchAttributeTypeString to be a string representation of the valid Go type. +// Returns the IndexedValueType if parsing is successful, otherwise an error. +// See https://docs.temporal.io/visibility#supported-types for supported types. +func searchAttributeTypeStringToEnum(searchAttributeTypeString string) (enums.IndexedValueType, error) { + for k, v := range enums.IndexedValueType_shorthandValue { + if strings.EqualFold(searchAttributeTypeString, k) { + return enums.IndexedValueType(v), nil + } + } + return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("unsupported search attribute type: %v", searchAttributeTypeString) +} + // ensureFinalizer ensures the deletion finalizer is set on the object if the user allowed namespace deletion using the CRD. func (r *TemporalNamespaceReconciler) ensureFinalizer(namespace *v1beta1.TemporalNamespace) { if namespace.ObjectMeta.DeletionTimestamp.IsZero() && namespace.Spec.AllowDeletion { diff --git a/docs/api/v1beta1.md b/docs/api/v1beta1.md index 9da6cc91..8a0e103b 100644 --- a/docs/api/v1beta1.md +++ b/docs/api/v1beta1.md @@ -5232,6 +5232,20 @@ Only applicable if the namespace is a global namespace.
customSearchAttributesSearch attributes are key-value pairs of metadata objects included in a workflow +execution’s visibility information. Temporal uses some default search attributes +but also supports custom search attributes.
+activeClusterNamecustomSearchAttributesSearch attributes are key-value pairs of metadata objects included in a workflow +execution’s visibility information. Temporal uses some default search attributes +but also supports custom search attributes.
+activeClusterName