Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/resources/brokerconsumers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func createStackNatsConsumer(ctx core.Context, stack *v1beta1.Stack, consumer *v
core.Env("NATS_URI", fmt.Sprintf("nats://%s", broker.Status.URI.Host)),
core.Env("STREAM", stack.Name),
core.Env("NAME", consumerName),
core.Env("DELIVER", consumer.Spec.QueriedBy),
core.Env("DELIVER", consumerName),
core.Env("SUBJECTS", strings.Join(
collectionutils.Map(consumer.Spec.Services, func(from string) string {
return fmt.Sprintf("%s.%s", stack.Name, from)
Expand Down
132 changes: 123 additions & 9 deletions internal/resources/reconciliations/deployments.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package reconciliations

import (
"fmt"

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -10,49 +13,158 @@ import (
"github.com/formancehq/operator/internal/resources/applications"
"github.com/formancehq/operator/internal/resources/authclients"
"github.com/formancehq/operator/internal/resources/auths"
"github.com/formancehq/operator/internal/resources/brokers"
"github.com/formancehq/operator/internal/resources/databases"
"github.com/formancehq/operator/internal/resources/gateways"
"github.com/formancehq/operator/internal/resources/registries"
"github.com/formancehq/operator/internal/resources/settings"
)

func createDeployment(
const (
DeploymentTypeAPI = "api"
DeploymentTypeWorkerIngestion = "worker-ingestion"
DeploymentTypeWorkerMatching = "worker-matching"
)

func commonEnvVars(
ctx core.Context,
stack *v1beta1.Stack,
reconciliation *v1beta1.Reconciliation,
database *v1beta1.Database,
authClient *v1beta1.AuthClient,
imageConfiguration *registries.ImageConfiguration,
) error {
) ([]v1.EnvVar, error) {
brokerURI, err := settings.RequireURL(ctx, stack.Name, "broker", "dsn")
if err != nil {
return nil, err
}
if brokerURI == nil {
return nil, errors.New("missing broker configuration")
}

env := make([]v1.EnvVar, 0)
otlpEnv, err := settings.GetOTELEnvVars(ctx, stack.Name, core.LowerCamelCaseKind(ctx, reconciliation), " ")
if err != nil {
return err
return nil, err
}
env = append(env, otlpEnv...)

gatewayEnv, err := gateways.EnvVarsIfEnabled(ctx, stack.Name)
if err != nil {
return err
return nil, err
}

postgresEnvVar, err := databases.GetPostgresEnvVars(ctx, stack, database)
if err != nil {
return err
return nil, err
}

brokerEnvVar, err := brokers.GetBrokerEnvVars(ctx, brokerURI, stack.Name, "reconciliation")
if err != nil {
return nil, err
}

env = append(env, gatewayEnv...)
env = append(env, core.GetDevEnvVars(stack, reconciliation)...)
env = append(env, postgresEnvVar...)
env = append(env, core.Env("POSTGRES_DATABASE_NAME", "$(POSTGRES_DATABASE)"))
env = append(env, authclients.GetEnvVars(authClient)...)
env = append(env, brokerEnvVar...)

authEnvVars, err := auths.ProtectedEnvVars(ctx, stack, "reconciliation", reconciliation.Spec.Auth)
if err != nil {
return err
return nil, err
}
env = append(env, authEnvVars...)

return env, nil
}

func createDeployments(
ctx core.Context,
stack *v1beta1.Stack,
reconciliation *v1beta1.Reconciliation,
database *v1beta1.Database,
authClient *v1beta1.AuthClient,
ingestionConsumer *v1beta1.BrokerConsumer,
matchingConsumer *v1beta1.BrokerConsumer,
imageConfiguration *registries.ImageConfiguration,
) error {
// Deploy workers first, then API
// This ensures workers are ready to process events before API starts accepting requests
if err := createDeployment(ctx, stack, reconciliation, database, authClient, ingestionConsumer, imageConfiguration, DeploymentTypeWorkerIngestion); err != nil {
return err
}

if err := createDeployment(ctx, stack, reconciliation, database, authClient, matchingConsumer, imageConfiguration, DeploymentTypeWorkerMatching); err != nil {
return err
}

if err := createDeployment(ctx, stack, reconciliation, database, authClient, nil, imageConfiguration, DeploymentTypeAPI); err != nil {
return err
}

return nil
}

func createDeployment(
ctx core.Context,
stack *v1beta1.Stack,
reconciliation *v1beta1.Reconciliation,
database *v1beta1.Database,
authClient *v1beta1.AuthClient,
consumer *v1beta1.BrokerConsumer,
imageConfiguration *registries.ImageConfiguration,
deploymentType string,
) error {
var (
containerName string
metaName string
args []string
)

switch deploymentType {
case DeploymentTypeWorkerIngestion:
containerName = "reconciliation-worker-ingestion"
metaName = "reconciliation-worker-ingestion"
args = []string{"worker", "ingestion"}
case DeploymentTypeWorkerMatching:
containerName = "reconciliation-worker-matching"
metaName = "reconciliation-worker-matching"
args = []string{"worker", "matching"}
case DeploymentTypeAPI:
containerName = "reconciliation"
metaName = "reconciliation"
args = []string{"serve"}
default:
return errors.Errorf("unknown deployment type: %s", deploymentType)
}

env, err := commonEnvVars(ctx, stack, reconciliation, database, authClient)
if err != nil {
return err
}

// Add Elasticsearch env vars for both API and Worker
esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
if err != nil {
return err
}
env = append(env, esEnvVars...)

Comment on lines +147 to +153
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Elasticsearch env vars are injected into the API deployment.

The tests in this PR assert the API deployment should not carry ELASTICSEARCH_* env vars. If that expectation is correct, gate these env vars to the worker deployments only (or update the tests/spec to match the intended behavior).

💡 Suggested fix (apply ES env vars to workers only)
-	// Add Elasticsearch env vars for both API and Worker
-	esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
-	if err != nil {
-		return err
-	}
-	env = append(env, esEnvVars...)
+	// Add Elasticsearch env vars only for worker deployments
+	if deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching {
+		esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
+		if err != nil {
+			return err
+		}
+		env = append(env, esEnvVars...)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Add Elasticsearch env vars for both API and Worker
esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
if err != nil {
return err
}
env = append(env, esEnvVars...)
// Add Elasticsearch env vars only for worker deployments
if deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching {
esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
if err != nil {
return err
}
env = append(env, esEnvVars...)
}
🤖 Prompt for AI Agents
In `@internal/resources/reconciliations/deployments.go` around lines 147 - 153,
The Elasticsearch env var injection currently calls
settings.GetElasticsearchEnvVars(ctx, stack.Name) and appends esEnvVars into the
shared env used for the API deployment (variable env); move this call and the
append so ES vars are only added when constructing the worker deployment
environment (e.g., where the worker-specific env is built or where worker
container spec is created) instead of the API path, or alternatively update the
tests to expect ES vars on the API if that was intended; ensure references to
esEnvVars and settings.GetElasticsearchEnvVars remain in the worker-deployment
code paths and remove the append from the API deployment construction.

// Workers: inject KAFKA_TOPICS from their respective consumer
if (deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching) && consumer != nil {
topics, err := brokers.GetTopicsEnvVars(ctx, stack, "KAFKA_TOPICS", consumer.Spec.Services...)
if err != nil {
return err
}
env = append(env, topics...)
}

// Ingestion worker: publish to {stack}.reconciliation
if deploymentType == DeploymentTypeWorkerIngestion {
env = append(env, core.Env("PUBLISHER_TOPIC_MAPPING", fmt.Sprintf("*:%s.reconciliation", stack.Name)))
}

serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name)
if err != nil {
return err
Expand All @@ -61,17 +173,18 @@ func createDeployment(
return applications.
New(reconciliation, &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "reconciliation",
Name: metaName,
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
ImagePullSecrets: imageConfiguration.PullSecrets,
ServiceAccountName: serviceAccountName,
Containers: []v1.Container{{
Name: "reconciliation",
Name: containerName,
Env: env,
Image: imageConfiguration.GetFullImageName(),
Args: args,
Ports: []v1.ContainerPort{applications.StandardHTTPPort()},
LivenessProbe: applications.DefaultLiveness("http"),
ReadinessProbe: applications.DefaultReadiness("http"),
Expand All @@ -83,3 +196,4 @@ func createDeployment(
IsEE().
Install(ctx)
}

89 changes: 74 additions & 15 deletions internal/resources/reconciliations/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ limitations under the License.
package reconciliations

import (
"fmt"

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1"
. "github.com/formancehq/operator/internal/core"
"github.com/formancehq/operator/internal/resources/authclients"
"github.com/formancehq/operator/internal/resources/brokerconsumers"
"github.com/formancehq/operator/internal/resources/brokers"
"github.com/formancehq/operator/internal/resources/databases"
"github.com/formancehq/operator/internal/resources/gatewayhttpapis"
"github.com/formancehq/operator/internal/resources/registries"
Expand All @@ -34,42 +40,92 @@ import (
//+kubebuilder:rbac:groups=formance.com,resources=reconciliations/finalizers,verbs=update

func Reconcile(ctx Context, stack *v1beta1.Stack, reconciliation *v1beta1.Reconciliation, version string) error {
// Clean up legacy single worker consumer and deployment
if err := deleteLegacyResources(ctx, reconciliation); err != nil {
return err
}

database, err := databases.Create(ctx, stack, reconciliation)
if err != nil {
return err
}

ingestionConsumer, err := brokerconsumers.Create(ctx, reconciliation, "ingestion", "ledger", "payments")
if err != nil {
return err
}

matchingConsumer, err := brokerconsumers.Create(ctx, reconciliation, "matching", "reconciliation")
if err != nil {
return err
}

authClient, err := authclients.Create(ctx, stack, reconciliation, "reconciliation",
authclients.WithScopes("ledger:read", "payments:read"))
if err != nil {
return err
}

if database.Status.Ready {
if err := gatewayhttpapis.Create(ctx, reconciliation, gatewayhttpapis.WithHealthCheckEndpoint("_healthcheck")); err != nil {
return err
}

imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "reconciliation", version)
if err != nil {
return errors.Wrap(err, "resolving image")
}
if !database.Status.Ready {
return NewPendingError().WithMessage("database not ready")
}

imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "reconciliation", version)
if err != nil {
return errors.Wrap(err, "resolving image")
}

if IsGreaterOrEqual(version, "v2.0.0-rc.5") && databases.GetSavedModuleVersion(database) != version {
if IsGreaterOrEqual(version, "v2.0.0-rc.5") && databases.GetSavedModuleVersion(database) != version {

if err := databases.Migrate(ctx, stack, reconciliation, imageConfiguration, database); err != nil {
return err
}
if err := databases.Migrate(ctx, stack, reconciliation, imageConfiguration, database); err != nil {
return err
}

if err := databases.SaveModuleVersion(ctx, database, version); err != nil {
return errors.Wrap(err, "saving module version in database object")
}
if err := databases.SaveModuleVersion(ctx, database, version); err != nil {
return errors.Wrap(err, "saving module version in database object")
}
}

if err := createDeployment(ctx, stack, reconciliation, database, authClient, imageConfiguration); err != nil {
if ingestionConsumer.Status.Ready && matchingConsumer.Status.Ready {
if err := createDeployments(ctx, stack, reconciliation, database, authClient, ingestionConsumer, matchingConsumer, imageConfiguration); err != nil {
return err
}
}

if err := gatewayhttpapis.Create(ctx, reconciliation, gatewayhttpapis.WithHealthCheckEndpoint("_healthcheck")); err != nil {
return err
return nil
}

func deleteLegacyResources(ctx Context, reconciliation *v1beta1.Reconciliation) error {
// Delete legacy BrokerConsumer (name: <owner>-reconciliation, created with empty name param)
legacyConsumer := &v1beta1.BrokerConsumer{}
legacyConsumerName := fmt.Sprintf("%s-reconciliation", reconciliation.Name)
if err := ctx.GetClient().Get(ctx, types.NamespacedName{Name: legacyConsumerName}, legacyConsumer); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
} else {
if err := ctx.GetClient().Delete(ctx, legacyConsumer); err != nil && !apierrors.IsNotFound(err) {
return err
}
}

// Delete legacy Deployment (name: reconciliation-worker)
legacyDeployment := &appsv1.Deployment{}
if err := ctx.GetClient().Get(ctx, types.NamespacedName{
Name: "reconciliation-worker",
Namespace: reconciliation.Namespace,
}, legacyDeployment); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
} else {
if err := ctx.GetClient().Delete(ctx, legacyDeployment); err != nil && !apierrors.IsNotFound(err) {
return err
}
}

return nil
Expand All @@ -78,6 +134,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, reconciliation *v1beta1.Reconc
func init() {
Init(
WithModuleReconciler(Reconcile,
WithOwn[*v1beta1.Reconciliation](&v1beta1.BrokerConsumer{}),
WithOwn[*v1beta1.Reconciliation](&v1beta1.Database{}),
WithOwn[*v1beta1.Reconciliation](&appsv1.Deployment{}),
WithOwn[*v1beta1.Reconciliation](&v1beta1.AuthClient{}),
Expand All @@ -87,6 +144,8 @@ func init() {
WithWatchSettings[*v1beta1.Reconciliation](),
WithWatchDependency[*v1beta1.Reconciliation](&v1beta1.Ledger{}),
WithWatchDependency[*v1beta1.Reconciliation](&v1beta1.Payments{}),
databases.Watch[*v1beta1.Reconciliation](),
brokers.Watch[*v1beta1.Reconciliation](),
),
)
}
Loading
Loading