From 3e1b05ecf8cea523bda523f3523178af8ad8b8a3 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Thu, 29 Jan 2026 16:28:05 +0100 Subject: [PATCH] feat(reconciliation): split worker into ingestion and matching workers - Split single reconciliation worker into two dedicated workers: - Ingestion worker (`worker ingestion`): consumes ledger/payments topics - Matching worker (`worker matching`): consumes reconciliation topic - Each worker has its own BrokerConsumer with distinct NATS deliver queue groups - Ingestion worker publishes to `{stack}.reconciliation` topic - Add cleanup of legacy single worker consumer and deployment - Add Elasticsearch env vars support for reconciliation deployments - Fix duplicate POD_NAME env var when both OTEL traces and metrics are enabled --- .../resources/brokerconsumers/controller.go | 2 +- .../resources/reconciliations/deployments.go | 132 ++++++++++- internal/resources/reconciliations/init.go | 89 +++++-- internal/resources/settings/elasticsearch.go | 56 +++++ internal/resources/settings/opentelemetry.go | 21 +- .../tests/reconciliation_controller_test.go | 220 ++++++++++++++++++ 6 files changed, 486 insertions(+), 34 deletions(-) create mode 100644 internal/resources/settings/elasticsearch.go create mode 100644 internal/tests/reconciliation_controller_test.go diff --git a/internal/resources/brokerconsumers/controller.go b/internal/resources/brokerconsumers/controller.go index 99e2d693..718b2c94 100644 --- a/internal/resources/brokerconsumers/controller.go +++ b/internal/resources/brokerconsumers/controller.go @@ -254,7 +254,7 @@ func createStackNatsConsumer(ctx core.Context, stack *v1beta1.Stack, consumer *v core.Env("NATS_URI", fmt.Sprintf("nats://%s", broker.Status.URI.Host)), core.Env("STREAM", stack.Name), core.Env("NAME", consumerName), - core.Env("DELIVER", consumer.Spec.QueriedBy), + core.Env("DELIVER", consumerName), core.Env("SUBJECTS", strings.Join( collectionutils.Map(consumer.Spec.Services, func(from string) string { return fmt.Sprintf("%s.%s", stack.Name, from) diff --git a/internal/resources/reconciliations/deployments.go b/internal/resources/reconciliations/deployments.go index 825409b3..94643915 100644 --- a/internal/resources/reconciliations/deployments.go +++ b/internal/resources/reconciliations/deployments.go @@ -1,6 +1,9 @@ package reconciliations import ( + "fmt" + + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -10,35 +13,54 @@ import ( "github.com/formancehq/operator/internal/resources/applications" "github.com/formancehq/operator/internal/resources/authclients" "github.com/formancehq/operator/internal/resources/auths" + "github.com/formancehq/operator/internal/resources/brokers" "github.com/formancehq/operator/internal/resources/databases" "github.com/formancehq/operator/internal/resources/gateways" "github.com/formancehq/operator/internal/resources/registries" "github.com/formancehq/operator/internal/resources/settings" ) -func createDeployment( +const ( + DeploymentTypeAPI = "api" + DeploymentTypeWorkerIngestion = "worker-ingestion" + DeploymentTypeWorkerMatching = "worker-matching" +) + +func commonEnvVars( ctx core.Context, stack *v1beta1.Stack, reconciliation *v1beta1.Reconciliation, database *v1beta1.Database, authClient *v1beta1.AuthClient, - imageConfiguration *registries.ImageConfiguration, -) error { +) ([]v1.EnvVar, error) { + brokerURI, err := settings.RequireURL(ctx, stack.Name, "broker", "dsn") + if err != nil { + return nil, err + } + if brokerURI == nil { + return nil, errors.New("missing broker configuration") + } + env := make([]v1.EnvVar, 0) otlpEnv, err := settings.GetOTELEnvVars(ctx, stack.Name, core.LowerCamelCaseKind(ctx, reconciliation), " ") if err != nil { - return err + return nil, err } env = append(env, otlpEnv...) gatewayEnv, err := gateways.EnvVarsIfEnabled(ctx, stack.Name) if err != nil { - return err + return nil, err } postgresEnvVar, err := databases.GetPostgresEnvVars(ctx, stack, database) if err != nil { - return err + return nil, err + } + + brokerEnvVar, err := brokers.GetBrokerEnvVars(ctx, brokerURI, stack.Name, "reconciliation") + if err != nil { + return nil, err } env = append(env, gatewayEnv...) @@ -46,13 +68,103 @@ func createDeployment( env = append(env, postgresEnvVar...) env = append(env, core.Env("POSTGRES_DATABASE_NAME", "$(POSTGRES_DATABASE)")) env = append(env, authclients.GetEnvVars(authClient)...) + env = append(env, brokerEnvVar...) authEnvVars, err := auths.ProtectedEnvVars(ctx, stack, "reconciliation", reconciliation.Spec.Auth) if err != nil { - return err + return nil, err } env = append(env, authEnvVars...) + return env, nil +} + +func createDeployments( + ctx core.Context, + stack *v1beta1.Stack, + reconciliation *v1beta1.Reconciliation, + database *v1beta1.Database, + authClient *v1beta1.AuthClient, + ingestionConsumer *v1beta1.BrokerConsumer, + matchingConsumer *v1beta1.BrokerConsumer, + imageConfiguration *registries.ImageConfiguration, +) error { + // Deploy workers first, then API + // This ensures workers are ready to process events before API starts accepting requests + if err := createDeployment(ctx, stack, reconciliation, database, authClient, ingestionConsumer, imageConfiguration, DeploymentTypeWorkerIngestion); err != nil { + return err + } + + if err := createDeployment(ctx, stack, reconciliation, database, authClient, matchingConsumer, imageConfiguration, DeploymentTypeWorkerMatching); err != nil { + return err + } + + if err := createDeployment(ctx, stack, reconciliation, database, authClient, nil, imageConfiguration, DeploymentTypeAPI); err != nil { + return err + } + + return nil +} + +func createDeployment( + ctx core.Context, + stack *v1beta1.Stack, + reconciliation *v1beta1.Reconciliation, + database *v1beta1.Database, + authClient *v1beta1.AuthClient, + consumer *v1beta1.BrokerConsumer, + imageConfiguration *registries.ImageConfiguration, + deploymentType string, +) error { + var ( + containerName string + metaName string + args []string + ) + + switch deploymentType { + case DeploymentTypeWorkerIngestion: + containerName = "reconciliation-worker-ingestion" + metaName = "reconciliation-worker-ingestion" + args = []string{"worker", "ingestion"} + case DeploymentTypeWorkerMatching: + containerName = "reconciliation-worker-matching" + metaName = "reconciliation-worker-matching" + args = []string{"worker", "matching"} + case DeploymentTypeAPI: + containerName = "reconciliation" + metaName = "reconciliation" + args = []string{"serve"} + default: + return errors.Errorf("unknown deployment type: %s", deploymentType) + } + + env, err := commonEnvVars(ctx, stack, reconciliation, database, authClient) + if err != nil { + return err + } + + // Add Elasticsearch env vars for both API and Worker + esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name) + if err != nil { + return err + } + env = append(env, esEnvVars...) + + // Workers: inject KAFKA_TOPICS from their respective consumer + if (deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching) && consumer != nil { + topics, err := brokers.GetTopicsEnvVars(ctx, stack, "KAFKA_TOPICS", consumer.Spec.Services...) + if err != nil { + return err + } + env = append(env, topics...) + } + + // Ingestion worker: publish to {stack}.reconciliation + if deploymentType == DeploymentTypeWorkerIngestion { + env = append(env, core.Env("PUBLISHER_TOPIC_MAPPING", fmt.Sprintf("*:%s.reconciliation", stack.Name))) + } + serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) if err != nil { return err @@ -61,7 +173,7 @@ func createDeployment( return applications. New(reconciliation, &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: "reconciliation", + Name: metaName, }, Spec: appsv1.DeploymentSpec{ Template: v1.PodTemplateSpec{ @@ -69,9 +181,10 @@ func createDeployment( ImagePullSecrets: imageConfiguration.PullSecrets, ServiceAccountName: serviceAccountName, Containers: []v1.Container{{ - Name: "reconciliation", + Name: containerName, Env: env, Image: imageConfiguration.GetFullImageName(), + Args: args, Ports: []v1.ContainerPort{applications.StandardHTTPPort()}, LivenessProbe: applications.DefaultLiveness("http"), ReadinessProbe: applications.DefaultReadiness("http"), @@ -83,3 +196,4 @@ func createDeployment( IsEE(). Install(ctx) } + diff --git a/internal/resources/reconciliations/init.go b/internal/resources/reconciliations/init.go index 6a0c080b..4be45676 100644 --- a/internal/resources/reconciliations/init.go +++ b/internal/resources/reconciliations/init.go @@ -17,13 +17,19 @@ limitations under the License. package reconciliations import ( + "fmt" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1" . "github.com/formancehq/operator/internal/core" "github.com/formancehq/operator/internal/resources/authclients" + "github.com/formancehq/operator/internal/resources/brokerconsumers" + "github.com/formancehq/operator/internal/resources/brokers" "github.com/formancehq/operator/internal/resources/databases" "github.com/formancehq/operator/internal/resources/gatewayhttpapis" "github.com/formancehq/operator/internal/resources/registries" @@ -34,42 +40,92 @@ import ( //+kubebuilder:rbac:groups=formance.com,resources=reconciliations/finalizers,verbs=update func Reconcile(ctx Context, stack *v1beta1.Stack, reconciliation *v1beta1.Reconciliation, version string) error { + // Clean up legacy single worker consumer and deployment + if err := deleteLegacyResources(ctx, reconciliation); err != nil { + return err + } + database, err := databases.Create(ctx, stack, reconciliation) if err != nil { return err } + ingestionConsumer, err := brokerconsumers.Create(ctx, reconciliation, "ingestion", "ledger", "payments") + if err != nil { + return err + } + + matchingConsumer, err := brokerconsumers.Create(ctx, reconciliation, "matching", "reconciliation") + if err != nil { + return err + } + authClient, err := authclients.Create(ctx, stack, reconciliation, "reconciliation", authclients.WithScopes("ledger:read", "payments:read")) if err != nil { return err } - if database.Status.Ready { + if err := gatewayhttpapis.Create(ctx, reconciliation, gatewayhttpapis.WithHealthCheckEndpoint("_healthcheck")); err != nil { + return err + } - imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "reconciliation", version) - if err != nil { - return errors.Wrap(err, "resolving image") - } + if !database.Status.Ready { + return NewPendingError().WithMessage("database not ready") + } + + imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "reconciliation", version) + if err != nil { + return errors.Wrap(err, "resolving image") + } - if IsGreaterOrEqual(version, "v2.0.0-rc.5") && databases.GetSavedModuleVersion(database) != version { + if IsGreaterOrEqual(version, "v2.0.0-rc.5") && databases.GetSavedModuleVersion(database) != version { - if err := databases.Migrate(ctx, stack, reconciliation, imageConfiguration, database); err != nil { - return err - } + if err := databases.Migrate(ctx, stack, reconciliation, imageConfiguration, database); err != nil { + return err + } - if err := databases.SaveModuleVersion(ctx, database, version); err != nil { - return errors.Wrap(err, "saving module version in database object") - } + if err := databases.SaveModuleVersion(ctx, database, version); err != nil { + return errors.Wrap(err, "saving module version in database object") } + } - if err := createDeployment(ctx, stack, reconciliation, database, authClient, imageConfiguration); err != nil { + if ingestionConsumer.Status.Ready && matchingConsumer.Status.Ready { + if err := createDeployments(ctx, stack, reconciliation, database, authClient, ingestionConsumer, matchingConsumer, imageConfiguration); err != nil { return err } } - if err := gatewayhttpapis.Create(ctx, reconciliation, gatewayhttpapis.WithHealthCheckEndpoint("_healthcheck")); err != nil { - return err + return nil +} + +func deleteLegacyResources(ctx Context, reconciliation *v1beta1.Reconciliation) error { + // Delete legacy BrokerConsumer (name: -reconciliation, created with empty name param) + legacyConsumer := &v1beta1.BrokerConsumer{} + legacyConsumerName := fmt.Sprintf("%s-reconciliation", reconciliation.Name) + if err := ctx.GetClient().Get(ctx, types.NamespacedName{Name: legacyConsumerName}, legacyConsumer); err != nil { + if !apierrors.IsNotFound(err) { + return err + } + } else { + if err := ctx.GetClient().Delete(ctx, legacyConsumer); err != nil && !apierrors.IsNotFound(err) { + return err + } + } + + // Delete legacy Deployment (name: reconciliation-worker) + legacyDeployment := &appsv1.Deployment{} + if err := ctx.GetClient().Get(ctx, types.NamespacedName{ + Name: "reconciliation-worker", + Namespace: reconciliation.Namespace, + }, legacyDeployment); err != nil { + if !apierrors.IsNotFound(err) { + return err + } + } else { + if err := ctx.GetClient().Delete(ctx, legacyDeployment); err != nil && !apierrors.IsNotFound(err) { + return err + } } return nil @@ -78,6 +134,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, reconciliation *v1beta1.Reconc func init() { Init( WithModuleReconciler(Reconcile, + WithOwn[*v1beta1.Reconciliation](&v1beta1.BrokerConsumer{}), WithOwn[*v1beta1.Reconciliation](&v1beta1.Database{}), WithOwn[*v1beta1.Reconciliation](&appsv1.Deployment{}), WithOwn[*v1beta1.Reconciliation](&v1beta1.AuthClient{}), @@ -87,6 +144,8 @@ func init() { WithWatchSettings[*v1beta1.Reconciliation](), WithWatchDependency[*v1beta1.Reconciliation](&v1beta1.Ledger{}), WithWatchDependency[*v1beta1.Reconciliation](&v1beta1.Payments{}), + databases.Watch[*v1beta1.Reconciliation](), + brokers.Watch[*v1beta1.Reconciliation](), ), ) } diff --git a/internal/resources/settings/elasticsearch.go b/internal/resources/settings/elasticsearch.go new file mode 100644 index 00000000..35aa020a --- /dev/null +++ b/internal/resources/settings/elasticsearch.go @@ -0,0 +1,56 @@ +package settings + +import ( + v1 "k8s.io/api/core/v1" + + "github.com/formancehq/operator/api/formance.com/v1beta1" + "github.com/formancehq/operator/internal/core" +) + +// GetElasticsearchEnvVars returns environment variables for Elasticsearch configuration. +// Returns an error if elasticsearch.dsn is not configured (required setting). +func GetElasticsearchEnvVars(ctx core.Context, stackName string) ([]v1.EnvVar, error) { + esURL, err := RequireURL(ctx, stackName, "elasticsearch", "dsn") + if err != nil { + return nil, err + } + + env := []v1.EnvVar{ + core.Env("ELASTICSEARCH_URL", esURL.WithoutQuery().String()), + } + + // Username/Password from query params (optional) + if username := esURL.Query().Get("username"); username != "" { + env = append(env, core.Env("ELASTICSEARCH_USERNAME", username)) + } + + if password := esURL.Query().Get("password"); password != "" { + env = append(env, core.Env("ELASTICSEARCH_PASSWORD", password)) + } + + // ILM Configuration with defaults + env = append(env, core.Env("ELASTICSEARCH_ILM_ENABLED", + getQueryParamOrDefault(esURL, "ilmEnabled", "true"))) + + env = append(env, core.Env("ELASTICSEARCH_ILM_HOT_PHASE_DAYS", + getQueryParamOrDefault(esURL, "ilmHotPhaseDays", "90"))) + + env = append(env, core.Env("ELASTICSEARCH_ILM_WARM_PHASE_ROLLOVER_DAYS", + getQueryParamOrDefault(esURL, "ilmWarmPhaseRolloverDays", "365"))) + + env = append(env, core.Env("ELASTICSEARCH_ILM_DELETE_PHASE_ENABLED", + getQueryParamOrDefault(esURL, "ilmDeletePhaseEnabled", "false"))) + + env = append(env, core.Env("ELASTICSEARCH_ILM_DELETE_PHASE_DAYS", + getQueryParamOrDefault(esURL, "ilmDeletePhaseDays", "0"))) + + return env, nil +} + +// getQueryParamOrDefault extracts a query parameter from the URI or returns the default value. +func getQueryParamOrDefault(uri *v1beta1.URI, key, defaultValue string) string { + if value := uri.Query().Get(key); value != "" { + return value + } + return defaultValue +} diff --git a/internal/resources/settings/opentelemetry.go b/internal/resources/settings/opentelemetry.go index d8e88a70..f5cd035e 100644 --- a/internal/resources/settings/opentelemetry.go +++ b/internal/resources/settings/opentelemetry.go @@ -31,7 +31,18 @@ func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSepa metrics = append(metrics, core.Env("OTEL_METRICS_RUNTIME", "true")) } - return append(traces, metrics...), nil + ret := append(traces, metrics...) + if len(ret) > 0 { + ret = append(ret, v1.EnvVar{ + Name: "POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }) + } + return ret, nil } func HasOpenTelemetryTracesEnabled(ctx core.Context, stack string) (bool, error) { @@ -64,14 +75,6 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, core.EnvFromBool(fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_INSECURE", string(monitoringType)), IsTrue(otlp.Query().Get("insecure"))), core.Env("OTEL_SERVICE_NAME", serviceName), core.Env(fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_MODE", string(monitoringType)), otlp.Scheme), - { - Name: "POD_NAME", - ValueFrom: &v1.EnvVarSource{ - FieldRef: &v1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, } // If the path is not empty, we use the full URL as the endpoint. diff --git a/internal/tests/reconciliation_controller_test.go b/internal/tests/reconciliation_controller_test.go new file mode 100644 index 00000000..0e468d5b --- /dev/null +++ b/internal/tests/reconciliation_controller_test.go @@ -0,0 +1,220 @@ +package tests_test + +import ( + "fmt" + + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + + v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1" + core "github.com/formancehq/operator/internal/core" + "github.com/formancehq/operator/internal/resources/settings" + . "github.com/formancehq/operator/internal/tests/internal" +) + +var _ = Describe("ReconciliationController", func() { + Context("When creating a Reconciliation object", func() { + var ( + stack *v1beta1.Stack + gateway *v1beta1.Gateway + auth *v1beta1.Auth + ledger *v1beta1.Ledger + payments *v1beta1.Payments + reconciliation *v1beta1.Reconciliation + databaseSettings *v1beta1.Settings + brokerDSNSettings *v1beta1.Settings + elasticsearchSettings *v1beta1.Settings + ) + BeforeEach(func() { + stack = &v1beta1.Stack{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.StackSpec{}, + } + databaseSettings = settings.New(uuid.NewString(), "postgres.*.uri", "postgresql://localhost", stack.Name) + brokerDSNSettings = settings.New(uuid.NewString(), "broker.dsn", "nats://localhost:1234", stack.Name) + elasticsearchSettings = settings.New(uuid.NewString(), "elasticsearch.dsn", "https://elasticsearch:9200?username=elastic&password=changeme&ilmEnabled=true&ilmHotPhaseDays=30", stack.Name) + gateway = &v1beta1.Gateway{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.GatewaySpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + Ingress: &v1beta1.GatewayIngress{}, + }, + } + auth = &v1beta1.Auth{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.AuthSpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + }, + } + ledger = &v1beta1.Ledger{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.LedgerSpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + }, + } + payments = &v1beta1.Payments{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.PaymentsSpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + }, + } + reconciliation = &v1beta1.Reconciliation{ + ObjectMeta: RandObjectMeta(), + Spec: v1beta1.ReconciliationSpec{ + StackDependency: v1beta1.StackDependency{ + Stack: stack.Name, + }, + }, + } + }) + JustBeforeEach(func() { + Expect(Create(stack)).To(Succeed()) + Expect(Create(databaseSettings)).To(Succeed()) + Expect(Create(brokerDSNSettings)).To(BeNil()) + Expect(Create(elasticsearchSettings)).To(Succeed()) + Expect(Create(gateway)).To(Succeed()) + Expect(Create(auth)).To(Succeed()) + Expect(Create(ledger)).To(Succeed()) + Expect(Create(payments)).To(Succeed()) + Expect(Create(reconciliation)).To(Succeed()) + }) + AfterEach(func() { + Expect(Delete(stack)).To(Succeed()) + Expect(Delete(databaseSettings)).To(Succeed()) + Expect(Delete(brokerDSNSettings)).To(Succeed()) + Expect(Delete(elasticsearchSettings)).To(Succeed()) + }) + It("Should create appropriate components", func() { + By("Should set the status to ready", func() { + Eventually(func(g Gomega) bool { + g.Expect(LoadResource("", reconciliation.Name, reconciliation)).To(Succeed()) + return reconciliation.Status.Ready + }).Should(BeTrue()) + }) + By("Should add an owner reference on the stack", func() { + Eventually(func(g Gomega) bool { + g.Expect(LoadResource("", reconciliation.Name, reconciliation)).To(Succeed()) + reference, err := core.HasOwnerReference(TestContext(), stack, reconciliation) + g.Expect(err).To(BeNil()) + return reference + }).Should(BeTrue()) + }) + By("Should create an API deployment", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation", deployment) + }).Should(Succeed()) + Expect(deployment).To(BeControlledBy(reconciliation)) + Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation")) + Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"serve"})) + }) + By("Should create a worker deployment", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation-worker", deployment) + }).Should(Succeed()) + Expect(deployment).To(BeControlledBy(reconciliation)) + Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker")) + Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker"})) + }) + By("API deployment should have broker environment variables", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation", deployment) + }).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements( + core.Env("BROKER", "nats"), + core.Env("PUBLISHER_NATS_ENABLED", "true"), + )) + }) + By("Worker deployment should have broker environment variables", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation-worker", deployment) + }).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements( + core.Env("BROKER", "nats"), + core.Env("PUBLISHER_NATS_ENABLED", "true"), + )) + }) + By("Worker deployment should have topics environment variable", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation-worker", deployment) + }).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElement( + core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)), + )) + }) + By("API deployment should NOT have topics environment variable", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation", deployment) + }).Should(Succeed()) + for _, envVar := range deployment.Spec.Template.Spec.Containers[0].Env { + Expect(envVar.Name).NotTo(Equal("KAFKA_TOPICS")) + } + }) + By("Should create a new GatewayHTTPAPI object", func() { + httpService := &v1beta1.GatewayHTTPAPI{} + Eventually(func() error { + return LoadResource("", core.GetObjectName(stack.Name, "reconciliation"), httpService) + }).Should(Succeed()) + }) + By("Should create a new AuthClient object", func() { + authClient := &v1beta1.AuthClient{} + Eventually(func() error { + return LoadResource("", core.GetObjectName(stack.Name, "reconciliation"), authClient) + }).Should(Succeed()) + }) + By("Should create a new BrokerConsumer object", func() { + consumer := &v1beta1.BrokerConsumer{} + Eventually(func() error { + return LoadResource("", reconciliation.Name+"-reconciliation", consumer) + }).Should(Succeed()) + }) + By("BrokerConsumer should have correct services", func() { + consumer := &v1beta1.BrokerConsumer{} + Eventually(func(g Gomega) []string { + g.Expect(LoadResource("", reconciliation.Name+"-reconciliation", consumer)).To(Succeed()) + return consumer.Spec.Services + }).Should(ContainElements("ledger", "payments")) + }) + By("Worker deployment should have Elasticsearch environment variables", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation-worker", deployment) + }).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements( + core.Env("ELASTICSEARCH_URL", "https://elasticsearch:9200"), + core.Env("ELASTICSEARCH_USERNAME", "elastic"), + core.Env("ELASTICSEARCH_PASSWORD", "changeme"), + core.Env("ELASTICSEARCH_ILM_ENABLED", "true"), + core.Env("ELASTICSEARCH_ILM_HOT_PHASE_DAYS", "30"), + core.Env("ELASTICSEARCH_ILM_WARM_PHASE_ROLLOVER_DAYS", "365"), + core.Env("ELASTICSEARCH_ILM_DELETE_PHASE_ENABLED", "false"), + core.Env("ELASTICSEARCH_ILM_DELETE_PHASE_DAYS", "0"), + )) + }) + By("API deployment should NOT have Elasticsearch environment variables", func() { + deployment := &appsv1.Deployment{} + Eventually(func() error { + return LoadResource(stack.Name, "reconciliation", deployment) + }).Should(Succeed()) + for _, envVar := range deployment.Spec.Template.Spec.Containers[0].Env { + Expect(envVar.Name).NotTo(Equal("ELASTICSEARCH_URL")) + } + }) + }) + }) +})