-
-
Notifications
You must be signed in to change notification settings - Fork 53
codify custom search attributes for TemporalNamespace #776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
408f89c
895333f
c6d6336
49126d7
80f3935
511d1e6
7e2f787
a9eb384
18964c6
db0a0ba
8452659
0a188b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,9 +21,13 @@ import ( | |
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/alexandrevilain/controller-tools/pkg/patch" | ||
| "github.com/go-logr/logr" | ||
| "go.temporal.io/api/enums/v1" | ||
| "go.temporal.io/api/operatorservice/v1" | ||
| "go.temporal.io/api/serviceerror" | ||
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
|
|
@@ -58,16 +62,19 @@ type TemporalNamespaceReconciler struct { | |
| func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { | ||
| logger := log.FromContext(ctx) | ||
|
|
||
| logger.Info("Starting reconciliation") | ||
| logger.Info("=== (NS) STARTING RECONCILIATION") | ||
| // logger.Info("Starting reconciliation") | ||
|
|
||
| namespace := &v1beta1.TemporalNamespace{} | ||
| err := r.Get(ctx, req.NamespacedName, namespace) | ||
| if err != nil { | ||
| if apierrors.IsNotFound(err) { | ||
| logger.Info("=== (NS) COULD NOT GET NAMESPACE") | ||
| return reconcile.Result{}, nil | ||
| } | ||
| return reconcile.Result{}, err | ||
| } | ||
| logger.Info("=== (NS) GOT NAMESPACE") | ||
|
|
||
| patchHelper, err := patch.NewHelper(namespace, r.Client) | ||
| if err != nil { | ||
|
|
@@ -80,6 +87,7 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re | |
| if err != nil { | ||
| reterr = kerrors.NewAggregate([]error{reterr, err}) | ||
| } | ||
| logger.Info("=== (NS) PATCHING CLUSTER") | ||
| }() | ||
|
|
||
| cluster := &v1beta1.TemporalCluster{} | ||
|
|
@@ -90,20 +98,25 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re | |
| // - TemporalCluster has not been created yet. In this case, if the TemporalNamespace is deleted, no point in waiting for the TemporalCluster to be healthy. | ||
| // - TemporalCluster existed at some point, but now is deleted. In this case, the underlying namespace in the Temporal server is already gone. | ||
| controllerutil.RemoveFinalizer(namespace, deletionFinalizer) | ||
| logger.Info("=== (NS) ERROR GETTING CLUSTER FROM k8s") | ||
| return reconcile.Result{}, nil | ||
| } | ||
| logger.Info("=== (NS) ERROR GETTING CLUSTER FROM k8s") | ||
| return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) | ||
| } | ||
| logger.Info("=== (NS) GOT CLUSTER FROM k8s") | ||
|
|
||
| if !cluster.IsReady() { | ||
| logger.Info("Skipping namespace reconciliation until referenced cluster is ready") | ||
| logger.Info("=== (NS) SKIPPING NAMESPACE RECONCILIATION UNTIL REFERENCED CLUSTER IS READY, REQUEUEING") | ||
| // logger.Info("Skipping namespace reconciliation until referenced cluster is ready") | ||
|
|
||
| return reconcile.Result{RequeueAfter: 10 * time.Second}, nil | ||
| } | ||
|
|
||
| // Check if the resource has been marked for deletion | ||
| if !namespace.ObjectMeta.DeletionTimestamp.IsZero() { | ||
| logger.Info("Deleting namespace") | ||
| logger.Info("=== (NS) DELETING NAMESPACE") | ||
| // logger.Info("Deleting namespace") | ||
|
|
||
| err := r.ensureNamespaceDeleted(ctx, namespace, cluster) | ||
| if err != nil { | ||
|
|
@@ -131,18 +144,167 @@ func (r *TemporalNamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Re | |
| return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) | ||
| } | ||
| err = client.Update(ctx, temporal.NamespaceToUpdateNamespaceRequest(cluster, namespace)) | ||
| logger.Info("=== (NS) UPDATING NAMESPACE") | ||
| if err != nil { | ||
| return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) | ||
| } | ||
| } | ||
|
|
||
| logger.Info("Successfully reconciled namespace", "namespace", namespace.GetName()) | ||
| logger.Info("=== (CSA) BEFORE RECONCILING SEARCH ATTRIBUTES") | ||
| err = r.reconcileCustomSearchAttributes(ctx, &logger, namespace, cluster) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alexandrevilain is this the right place to start reconciling custom search attribtues?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I should wait for the namespace to be marked ready, before reconciling custom search attributes, right? But where should I put that waiting logic?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think my current implementation is causing the
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this is the good place for this :) |
||
| if err != nil { | ||
| logger.Info(fmt.Sprintf("=== (CSA) FAILED TO RECONCILE CUSTOM SEARCH ATTRIBUTES: %v", err)) | ||
| //logger.Info(fmt.Sprintf("Failed to reconcile custom search attributes: %v", err)) | ||
| return r.handleError(namespace, v1beta1.ReconcileErrorReason, err) | ||
| } | ||
| logger.Info("=== (CSA) AFTER RECONCILING SEARCH ATTRIBUTES") | ||
|
|
||
| logger.Info("=== (NS) SUCCESSFULLY RECONCILED NAMESPACE", "namespace", namespace.GetName()) | ||
| // logger.Info("Successfully reconciled namespace", "namespace", namespace.GetName()) | ||
|
|
||
| v1beta1.SetTemporalNamespaceReady(namespace, metav1.ConditionTrue, v1beta1.TemporalNamespaceCreatedReason, "Namespace successfully created") | ||
|
|
||
| return r.handleSuccess(namespace) | ||
| } | ||
|
|
||
| // reconcileCustomSearchAttributes ensures that custom search attributes on the Temporal server exactly match those defined in the spec. | ||
| // | ||
| // NOTE: Reconciliation of custom search attributes is accomplished using these steps: | ||
| // | ||
| // 1. Retrieve the custom search attributes which are currently on the Temporal server. | ||
| // 2. Determine which custom search attributes need to be removed, if any. | ||
| // 3. Determine which custom search attributes need to be added, if any. | ||
| // 4. If needed, request the Temporal server to remove the necessary custom search attributes. | ||
| // 5. If needed, request the Temporal server to add the necessary custom search attributes. | ||
| // | ||
| // Some of these steps may fail if some Temporal search attribute constraint is violated; in which case this function will return early with a helpful error message. | ||
| func (r *TemporalNamespaceReconciler) reconcileCustomSearchAttributes(ctx context.Context, logger *logr.Logger, namespace *v1beta1.TemporalNamespace, cluster *v1beta1.TemporalCluster) error { | ||
| // Construct a client to talk to the Temporal server | ||
| client, err := temporal.GetClusterClient(ctx, r.Client, cluster) | ||
| if err != nil { | ||
| logger.Info("=== (CSA) COULDN'T CREATE CLIENT TO TALK TO TEMPORAL SERVER") | ||
| return err | ||
| } | ||
| logger.Info("=== (CSA) CREATED CLIENT TO TALK TO TEMPORAL SERVER") | ||
| // Requests to Temporal's OperatorService API need to specify the namespace name, so capture it here for future use. | ||
| ns := namespace.GetName() | ||
|
|
||
| // List all search attributes that are currently on the Temporal server | ||
| listRequest := &operatorservice.ListSearchAttributesRequest{Namespace: ns} | ||
| logger.Info("=== (CSA) CONSTRUCTED LIST SEARCH ATTR REQUEST") | ||
| serverSearchAttributes, err := client.OperatorService().ListSearchAttributes(ctx, listRequest) | ||
| if err != nil { | ||
| logger.Info("=== (CSA) COULDN'T RECEIVE SEARCH ATTRIBUTES ON SERVER") | ||
| return err | ||
| } | ||
| logger.Info("=== (CSA) RECEIVED SEARCH ATTRIBUTES ON SERVER") | ||
|
|
||
| // Narrow the focus to custom search attributes only. | ||
| serverCustomSearchAttributes := &serverSearchAttributes.CustomAttributes // use a pointer to avoid unnecessary copying | ||
| logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES ON SERVER: %v", serverCustomSearchAttributes)) | ||
|
|
||
| // Note that the CustomSearchAttributes map data structure that is built using the Spec merely maps string->string. | ||
| // To rigorously compare search attributes between the spec and the Temporal server, the types need to be consistent. | ||
| // We therefore construct a string->enums.IndexedValueType map from the "weaker" string->string map. | ||
| specCustomSearchAttributes, err := createIndexedValueTypeMap(&namespace.Spec.CustomSearchAttributes) | ||
| if err != nil { | ||
| logger.Info(fmt.Sprintf("=== (CSA) ERROR READING CUSTOM SEARCH ATTRIBUTES IN SPEC: %v", specCustomSearchAttributes)) | ||
| return err | ||
| } | ||
| logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES IN SPEC: %v", specCustomSearchAttributes)) | ||
|
|
||
| // Remove those custom search attributes from the Temporal server whose name does not exist in the Spec. | ||
| customSearchAttributesToRemove := make([]string, 0) | ||
| for serverSearchAttributeName := range *serverCustomSearchAttributes { | ||
| _, serverSearchAttributeNameExistsInSpec := specCustomSearchAttributes[serverSearchAttributeName] | ||
| if !serverSearchAttributeNameExistsInSpec { | ||
| customSearchAttributesToRemove = append(customSearchAttributesToRemove, serverSearchAttributeName) | ||
| } | ||
| } | ||
| logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES TO REMOVE: %v", customSearchAttributesToRemove)) | ||
|
|
||
| // Add custom search attributes from the Spec which don't yet exist on the Temporal server. | ||
| // If the Temporal server already has a custom search attribute with the same name but a different type, then return an error. | ||
| customSearchAttributesToAdd := make(map[string]enums.IndexedValueType) | ||
| for specSearchAttributeName, specSearchAttributeType := range specCustomSearchAttributes { | ||
| serverSearchAttributeType, specSearchAttributeNameExistsOnServer := (*serverCustomSearchAttributes)[specSearchAttributeName] | ||
| if !specSearchAttributeNameExistsOnServer { | ||
| customSearchAttributesToAdd[specSearchAttributeName] = specSearchAttributeType | ||
| } else if specSearchAttributeType != serverSearchAttributeType { | ||
| logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTE %s ALREADY EXISTS AND HAS DIFFERENT TYPE %s", specSearchAttributeName, serverSearchAttributeType.String())) | ||
| return fmt.Errorf("search attribute %s already exists and has different type %s", specSearchAttributeName, serverSearchAttributeType.String()) | ||
| } | ||
| } | ||
| logger.Info(fmt.Sprintf("=== (CSA) CUSTOM SEARCH ATTRIBUTES TO ADD: %v", customSearchAttributesToAdd)) | ||
|
|
||
| // If there are search attributes that should be removed, then make a request to the Temporal server to remove them. | ||
| if len(customSearchAttributesToRemove) > 0 { | ||
| removeRequest := &operatorservice.RemoveSearchAttributesRequest{ | ||
| Namespace: ns, | ||
| SearchAttributes: customSearchAttributesToRemove, | ||
| } | ||
| logger.Info("=== (CSA) CONSTRUCTED REMOVE ATTR REQUEST") | ||
| _, err = client.OperatorService().RemoveSearchAttributes(ctx, removeRequest) | ||
| logger.Info("=== (CSA) SERVER RESPONDED TO REMOVE REQUEST") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to remove search attributes: %w", err) | ||
| } | ||
| logger.Info(fmt.Sprintf("=== (CSA) REMOVED CUSTOM SEARCH ATTRIBUTES: %v", customSearchAttributesToRemove)) | ||
| // logger.Info(fmt.Sprintf("removed custom search attributes: %v", customSearchAttributesToRemove)) | ||
| } else { | ||
| logger.Info(fmt.Sprintf("=== (CSA) NO SEARCH ATTRIBUTES TO REMOVE %v", customSearchAttributesToRemove)) | ||
| } | ||
|
|
||
| // If there are search attributes that should be added, then make a request the Temporal server to create them. | ||
| if len(customSearchAttributesToAdd) > 0 { | ||
| addRequest := &operatorservice.AddSearchAttributesRequest{ | ||
| Namespace: ns, | ||
| SearchAttributes: customSearchAttributesToAdd, | ||
| } | ||
| logger.Info("=== (CSA) CONSTRUCTED ADD ATTR REQUEST") | ||
| _, err = client.OperatorService().AddSearchAttributes(ctx, addRequest) | ||
| logger.Info("=== (CSA) SERVER RESPONDED TO ADD REQUEST") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to add search attributes: %w", err) | ||
| } | ||
| logger.Info(fmt.Sprintf("=== (CSA) ADDED CUSTOM SEARCH ATTRIBUTES: %v", customSearchAttributesToAdd)) | ||
| // logger.Info(fmt.Sprintf("added custom search attributes: %v", customSearchAttributesToAdd)) | ||
| } else { | ||
| logger.Info(fmt.Sprintf("=== (CSA) NO SEARCH ATTRIBUTES TO ADD %v", customSearchAttributesToAdd)) | ||
| } | ||
|
|
||
|
|
||
| logger.Info("=== (CSA) CUSTOM SEARCH ATTRIBUTE RECONCILIATION LOOP FINISHED") | ||
| return nil | ||
| } | ||
|
|
||
| // createIndexedValueTypeMap is a helper function takes in a map[string]string and returns another | ||
| // map where each entry value from the old map is translated from a plain string into an enums.IndexedValueType. | ||
| func createIndexedValueTypeMap(m *map[string]string) (map[string]enums.IndexedValueType, error) { | ||
| specCustomSearchAttributes := make(map[string]enums.IndexedValueType, len(*m)) | ||
| for searchAttributeNameString, searchAttributeTypeString := range *m { | ||
| indexedValueType, err := searchAttributeTypeStringToEnum(searchAttributeTypeString) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to parse search attribute %s because its type is %s: %w", searchAttributeNameString, searchAttributeTypeString, err) | ||
| } | ||
| specCustomSearchAttributes[searchAttributeNameString] = indexedValueType | ||
| } | ||
| return specCustomSearchAttributes, nil | ||
| } | ||
|
|
||
| // searchAttributeTypeStringToEnum retrieves the actual IndexedValueType for a given string. | ||
| // It expects searchAttributeTypeString to be a string representation of the valid Go type. | ||
| // Returns the IndexedValueType if parsing is successful, otherwise an error. | ||
| // See https://docs.temporal.io/visibility#supported-types for supported types. | ||
| func searchAttributeTypeStringToEnum(searchAttributeTypeString string) (enums.IndexedValueType, error) { | ||
| for k, v := range enums.IndexedValueType_shorthandValue { | ||
| if strings.EqualFold(searchAttributeTypeString, k) { | ||
| return enums.IndexedValueType(v), nil | ||
| } | ||
| } | ||
| return enums.INDEXED_VALUE_TYPE_UNSPECIFIED, fmt.Errorf("unsupported search attribute type: %v", searchAttributeTypeString) | ||
| } | ||
|
|
||
| // ensureFinalizer ensures the deletion finalizer is set on the object if the user allowed namespace deletion using the CRD. | ||
| func (r *TemporalNamespaceReconciler) ensureFinalizer(namespace *v1beta1.TemporalNamespace) { | ||
| if namespace.ObjectMeta.DeletionTimestamp.IsZero() && namespace.Spec.AllowDeletion { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're ok on the fast that those kind of logging will be removed once the code is working, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly. These logging statements are for my debugging. I pushed them up to keep my branch up to date. Unfortunately, they show up in the PR.