From 22b8dd239372824b546169190a5e9f8ea9faca4d Mon Sep 17 00:00:00 2001 From: Nico Duldhardt Date: Sat, 28 Feb 2026 11:05:04 +0100 Subject: [PATCH] Remove minio defaults from broker, pass through operator env vars --- cmd/broker/main.go | 30 ++++++++--------------- cmd/broker/main_test.go | 39 ++++++++++++++++++------------ pkg/operator/cluster_controller.go | 6 +++++ pkg/operator/etcd_resources.go | 14 ++++------- 4 files changed, 44 insertions(+), 45 deletions(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 2448aa3..f291c86 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -51,12 +51,7 @@ const ( defaultKafkaPort = 19092 defaultMetricsAddr = ":19093" defaultControlAddr = ":19094" - defaultMinioBucket = "kafscale" - defaultMinioRegion = "us-east-1" - defaultMinioEndpoint = "http://127.0.0.1:9000" - defaultMinioAccessKey = "minioadmin" - defaultMinioSecretKey = "minioadmin" - defaultS3Concurrency = 64 + defaultS3Concurrency = 64 brokerVersion = "dev" ) @@ -2145,7 +2140,7 @@ func main() { } func buildS3Client(ctx context.Context, logger *slog.Logger) storage.S3Client { - writeCfg, readCfg, useMemory, usingDefaultMinio, credsProvided, useReadReplica := buildS3ConfigsFromEnv() + writeCfg, readCfg, useMemory, credsProvided, useReadReplica := buildS3ConfigsFromEnv() if useMemory { logger.Info("using in-memory S3 client", "env", "KAFSCALE_USE_MEMORY_S3=1") return storage.NewMemoryS3Client() @@ -2162,7 +2157,7 @@ func buildS3Client(ctx context.Context, logger *slog.Logger) storage.S3Client { os.Exit(1) } - logger.Info("using AWS-compatible S3 client", "bucket", writeCfg.Bucket, "region", writeCfg.Region, "endpoint", writeCfg.Endpoint, "force_path_style", writeCfg.ForcePathStyle, "kms_configured", writeCfg.KMSKeyARN != "", "default_minio", usingDefaultMinio, "credentials_provided", credsProvided) + logger.Info("using AWS-compatible S3 client", "bucket", writeCfg.Bucket, "region", writeCfg.Region, "endpoint", writeCfg.Endpoint, "force_path_style", writeCfg.ForcePathStyle, "kms_configured", writeCfg.KMSKeyARN != "", "credentials_provided", credsProvided) if useReadReplica { readClient, err := storage.NewS3Client(ctx, readCfg) @@ -2177,23 +2172,18 @@ func buildS3Client(ctx context.Context, logger *slog.Logger) storage.S3Client { return client } -func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bool, bool) { +func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bool) { if parseEnvBool("KAFSCALE_USE_MEMORY_S3", false) { - return storage.S3Config{}, storage.S3Config{}, true, false, false, false + return storage.S3Config{}, storage.S3Config{}, true, false, false } - writeBucket := envOrDefault("KAFSCALE_S3_BUCKET", defaultMinioBucket) - writeRegion := envOrDefault("KAFSCALE_S3_REGION", defaultMinioRegion) - writeEndpoint := envOrDefault("KAFSCALE_S3_ENDPOINT", defaultMinioEndpoint) - forcePathStyle := parseEnvBool("KAFSCALE_S3_PATH_STYLE", true) + writeBucket := os.Getenv("KAFSCALE_S3_BUCKET") + writeRegion := os.Getenv("KAFSCALE_S3_REGION") + writeEndpoint := os.Getenv("KAFSCALE_S3_ENDPOINT") + forcePathStyle := parseEnvBool("KAFSCALE_S3_PATH_STYLE", writeEndpoint != "") kmsARN := os.Getenv("KAFSCALE_S3_KMS_ARN") - usingDefaultMinio := writeBucket == defaultMinioBucket && writeRegion == defaultMinioRegion && writeEndpoint == defaultMinioEndpoint accessKey := os.Getenv("KAFSCALE_S3_ACCESS_KEY") secretKey := os.Getenv("KAFSCALE_S3_SECRET_KEY") sessionToken := os.Getenv("KAFSCALE_S3_SESSION_TOKEN") - if accessKey == "" && secretKey == "" && usingDefaultMinio { - accessKey = defaultMinioAccessKey - secretKey = defaultMinioSecretKey - } credsProvided := accessKey != "" && secretKey != "" s3Concurrency := parseEnvInt("KAFSCALE_S3_CONCURRENCY", defaultS3Concurrency) writeCfg := storage.S3Config{ @@ -2232,7 +2222,7 @@ func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bo KMSKeyARN: kmsARN, MaxConnections: s3Concurrency, } - return writeCfg, readCfg, false, usingDefaultMinio, credsProvided, useReadReplica + return writeCfg, readCfg, false, credsProvided, useReadReplica } func buildConnContextFunc(logger *slog.Logger) broker.ConnContextFunc { diff --git a/cmd/broker/main_test.go b/cmd/broker/main_test.go index a996730..224fce8 100644 --- a/cmd/broker/main_test.go +++ b/cmd/broker/main_test.go @@ -1818,13 +1818,10 @@ func TestBuildS3ConfigsFromEnv(t *testing.T) { t.Setenv("KAFSCALE_S3_READ_REGION", "us-east-2") t.Setenv("KAFSCALE_S3_READ_ENDPOINT", "http://s3-read.local") - writeCfg, readCfg, useMemory, usingDefaultMinio, credsProvided, useReadReplica := buildS3ConfigsFromEnv() + writeCfg, readCfg, useMemory, credsProvided, useReadReplica := buildS3ConfigsFromEnv() if useMemory { t.Fatalf("expected useMemory false") } - if usingDefaultMinio { - t.Fatalf("expected non-default minio") - } if !credsProvided { t.Fatalf("expected credsProvided true") } @@ -1862,38 +1859,48 @@ func TestBuildS3ConfigsFromEnvDefaults(t *testing.T) { t.Setenv("KAFSCALE_S3_READ_REGION", "") t.Setenv("KAFSCALE_S3_READ_ENDPOINT", "") - writeCfg, readCfg, useMemory, usingDefaultMinio, credsProvided, useReadReplica := buildS3ConfigsFromEnv() + writeCfg, readCfg, useMemory, credsProvided, useReadReplica := buildS3ConfigsFromEnv() if useMemory { t.Fatalf("expected useMemory false by default") } - if !usingDefaultMinio { - t.Fatalf("expected default minio true") - } - if !credsProvided { - t.Fatalf("expected default minio creds to be injected") + if credsProvided { + t.Fatalf("expected no credentials when env vars are empty") } if useReadReplica { t.Fatalf("expected read replica disabled") } - if writeCfg.Bucket != defaultMinioBucket || writeCfg.Region != defaultMinioRegion || writeCfg.Endpoint != defaultMinioEndpoint { - t.Fatalf("unexpected default write config: %+v", writeCfg) + if writeCfg.Bucket != "" || writeCfg.Region != "" || writeCfg.Endpoint != "" { + t.Fatalf("expected empty config when env vars are unset: %+v", writeCfg) + } + if writeCfg.ForcePathStyle { + t.Fatalf("expected forcePathStyle false when no endpoint is set") } - if readCfg.Bucket != defaultMinioBucket || readCfg.Region != defaultMinioRegion || readCfg.Endpoint != defaultMinioEndpoint { - t.Fatalf("unexpected default read config: %+v", readCfg) + if readCfg.Bucket != "" || readCfg.Region != "" || readCfg.Endpoint != "" { + t.Fatalf("expected empty read config when env vars are unset: %+v", readCfg) } } func TestBuildS3ConfigsFromEnvUseMemory(t *testing.T) { t.Setenv("KAFSCALE_USE_MEMORY_S3", "1") - _, _, useMemory, usingDefaultMinio, credsProvided, useReadReplica := buildS3ConfigsFromEnv() + _, _, useMemory, credsProvided, useReadReplica := buildS3ConfigsFromEnv() if !useMemory { t.Fatalf("expected useMemory true") } - if usingDefaultMinio || credsProvided || useReadReplica { + if credsProvided || useReadReplica { t.Fatalf("unexpected flags for memory mode") } } +func TestBuildS3ConfigsFromEnvPathStyleDefaultsToEndpoint(t *testing.T) { + t.Setenv("KAFSCALE_S3_ENDPOINT", "http://minio.local:9000") + t.Setenv("KAFSCALE_S3_PATH_STYLE", "") + + writeCfg, _, _, _, _ := buildS3ConfigsFromEnv() + if !writeCfg.ForcePathStyle { + t.Fatalf("expected forcePathStyle true when custom endpoint is set") + } +} + func TestStartupTimeoutFromEnv(t *testing.T) { t.Setenv("KAFSCALE_STARTUP_TIMEOUT_SEC", "12") if got := startupTimeoutFromEnv(); got != 12*time.Second { diff --git a/pkg/operator/cluster_controller.go b/pkg/operator/cluster_controller.go index 063123f..e08ac0c 100644 --- a/pkg/operator/cluster_controller.go +++ b/pkg/operator/cluster_controller.go @@ -247,6 +247,12 @@ func (r *ClusterReconciler) brokerContainer(cluster *kafscalev1alpha1.KafscaleCl if val := strings.TrimSpace(os.Getenv("KAFSCALE_PROXY_PROTOCOL")); val != "" { env = append(env, corev1.EnvVar{Name: "KAFSCALE_PROXY_PROTOCOL", Value: val}) } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_LOG_LEVEL")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_LOG_LEVEL", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_TRACE_KAFKA")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_TRACE_KAFKA", Value: val}) + } var envFrom []corev1.EnvFromSource if cluster.Spec.S3.CredentialsSecretRef != "" { envFrom = append(envFrom, corev1.EnvFromSource{ diff --git a/pkg/operator/etcd_resources.go b/pkg/operator/etcd_resources.go index e131eb0..03d4e76 100644 --- a/pkg/operator/etcd_resources.go +++ b/pkg/operator/etcd_resources.go @@ -242,9 +242,7 @@ func reconcileEtcdStatefulSet(ctx context.Context, c client.Client, scheme *runt {Name: "SNAPSHOT_BUCKET", Value: bucket}, {Name: "SNAPSHOT_PREFIX", Value: prefix}, } - if endpoint != "" { - restoreEnv = append(restoreEnv, corev1.EnvVar{Name: "AWS_ENDPOINT_URL", Value: endpoint}) - } + restoreEnv = append(restoreEnv, corev1.EnvVar{Name: "AWS_ENDPOINT_URL", Value: endpoint}) if strings.TrimSpace(cluster.Spec.S3.CredentialsSecretRef) != "" { secretRef := corev1.LocalObjectReference{Name: cluster.Spec.S3.CredentialsSecretRef} restoreEnv = append(restoreEnv, @@ -308,7 +306,7 @@ func reconcileEtcdStatefulSet(ctx context.Context, c client.Client, scheme *runt downloadScript := "set -euo pipefail\n" + "DATA_DIR=/var/lib/etcd\n" + "ENDPOINT_OPT=\"\"\n" + - "if [ -n \"$AWS_ENDPOINT_URL\" ]; then ENDPOINT_OPT=\"--endpoint-url $AWS_ENDPOINT_URL\"; fi\n" + + "if [ -n \"${AWS_ENDPOINT_URL:-}\" ]; then ENDPOINT_OPT=\"--endpoint-url $AWS_ENDPOINT_URL\"; fi\n" + "if [ -d \"$DATA_DIR/member\" ] && [ \"$(ls -A \"$DATA_DIR\")\" ]; then\n" + " echo \"etcd data dir not empty; skipping snapshot download\"\n" + " exit 0\n" + @@ -523,9 +521,7 @@ func reconcileEtcdSnapshotCronJob(ctx context.Context, c client.Client, scheme * {Name: "CREATE_BUCKET", Value: boolToString(createBucket)}, {Name: "PROTECT_BUCKET", Value: boolToString(protectBucket)}, } - if endpoint != "" { - uploadEnv = append(uploadEnv, corev1.EnvVar{Name: "AWS_ENDPOINT_URL", Value: endpoint}) - } + uploadEnv = append(uploadEnv, corev1.EnvVar{Name: "AWS_ENDPOINT_URL", Value: endpoint}) if strings.TrimSpace(cluster.Spec.S3.CredentialsSecretRef) != "" { secretRef := corev1.LocalObjectReference{Name: cluster.Spec.S3.CredentialsSecretRef} uploadEnv = append(uploadEnv, @@ -574,7 +570,7 @@ func reconcileEtcdSnapshotCronJob(ctx context.Context, c client.Client, scheme * "SNAPSHOT=/snapshots/etcd-snapshot.db\n" + "CHECKSUM=/snapshots/etcd-snapshot.db.sha256\n" + "ENDPOINT_OPT=\"\"\n" + - "if [ -n \"$AWS_ENDPOINT_URL\" ]; then ENDPOINT_OPT=\"--endpoint-url $AWS_ENDPOINT_URL\"; fi\n" + + "if [ -n \"${AWS_ENDPOINT_URL:-}\" ]; then ENDPOINT_OPT=\"--endpoint-url $AWS_ENDPOINT_URL\"; fi\n" + "if [ \"$CREATE_BUCKET\" = \"1\" ]; then\n" + " if ! aws $ENDPOINT_OPT s3api head-bucket --bucket \"$SNAPSHOT_BUCKET\" >/dev/null 2>&1; then\n" + " if [ \"$AWS_REGION\" = \"us-east-1\" ]; then\n" + @@ -714,7 +710,7 @@ func etcdReplicas() int32 { return int32(defaultEtcdReplicas) } parsed, err := strconv.ParseInt(raw, 10, 32) - if err != nil || parsed < int64(defaultEtcdReplicas) { + if err != nil || parsed < 1 { return int32(defaultEtcdReplicas) } return int32(parsed)