From 7f358d6c0fa8fc0f29e4d91ffcb42bcb0cfa7d97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 10:44:44 +0200 Subject: [PATCH 1/9] Add auth-proxy --- cmd/auth_proxy/main.go | 343 +++++++++++++++++++++++++ config/200-auth-proxy-role.yaml | 1 + config/core/roles/auth-proxy-role.yaml | 39 +++ pkg/auth/event_policy.go | 6 +- pkg/auth/event_policy_test.go | 50 ++-- pkg/auth/verifier.go | 93 +++++-- pkg/eventingtls/eventingtls.go | 12 + 7 files changed, 489 insertions(+), 55 deletions(-) create mode 100644 cmd/auth_proxy/main.go create mode 120000 config/200-auth-proxy-role.yaml create mode 100644 config/core/roles/auth-proxy-role.yaml diff --git a/cmd/auth_proxy/main.go b/cmd/auth_proxy/main.go new file mode 100644 index 00000000000..2149066a2f9 --- /dev/null +++ b/cmd/auth_proxy/main.go @@ -0,0 +1,343 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package main implements an authentication and authorization proxy that sits as a sidecar +// container alongside application pods. It performs OIDC-based authentication and policy-based +// authorization before forwarding requests to the target service. The proxy supports both HTTP +// and HTTPS traffic with configurable TLS settings. +package main + +import ( + "context" + "encoding/json" + "net" + "os" + + //nolint:gosec + "crypto/tls" + "fmt" + "log" + "net/http" + "net/http/httputil" + "net/url" + + "github.com/kelseyhightower/envconfig" + "go.uber.org/zap" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" + cmdbroker "knative.dev/eventing/cmd/broker" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" + "knative.dev/eventing/pkg/eventingtls" + "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/pkg/apis" + kubeclient "knative.dev/pkg/client/injection/kube/client" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + configmap "knative.dev/pkg/configmap/informer" + "knative.dev/pkg/controller" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" + "knative.dev/pkg/network" + "knative.dev/pkg/signals" + "knative.dev/pkg/system" +) + +const component = "auth-proxy" + +// envConfig holds all environment configuration for the auth proxy +type envConfig struct { + TargetHost string `envconfig:"TARGET_HOST" default:"localhost"` + TargetHTTPPort int `envconfig:"TARGET_HTTP_PORT" default:"8080"` + TargetHTTPSPort int `envconfig:"TARGET_HTTPS_PORT" default:"8443"` + ProxyHTTPPort int `envconfig:"PROXY_HTTP_PORT" default:"3128"` + ProxyHTTPSPort int `envconfig:"PROXY_HTTPS_PORT" default:"3129"` + + AuthPolicies string `envconfig:"AUTH_POLICIES" default:""` + SinkNamespace string `envconfig:"SINK_NAMESPACE"` + SinkTLSCertPath *string `envconfig:"SINK_TLS_CERT_FILE"` + SinkTLSKeyPath *string `envconfig:"SINK_TLS_KEY_FILE"` + SinkCACertsPath *string `envconfig:"SINK_TLS_CA_FILE"` + + SinkURI string `envconfig:"SINK_URI"` + SinkAudience *string `envconfig:"SINK_AUDIENCE"` +} + +// ProxyHandler handles HTTP requests and performs authentication/authorization +// before forwarding to the target service +type ProxyHandler struct { + kubeClient kubernetes.Interface + withContext func(ctx context.Context) context.Context + authVerifier *auth.Verifier + httpProxy *httputil.ReverseProxy + httpsProxy *httputil.ReverseProxy + config envConfig + authSubjects []auth.SubjectsWithFilters +} + +func main() { + ctx := signals.NewContext() + + config, err := loadConfig() + if err != nil { + log.Fatal("Failed to load configuration:", err) + } + + ctx, informers := setupInformers(ctx) + configMapWatcher := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace()) + logger := setupLogging(ctx, configMapWatcher) + defer logger.Sync() + + featureStore := setupFeatureStore(ctx, logger, configMapWatcher) + + handler, err := createProxyHandler(ctx, config, logger, featureStore, configMapWatcher) + if err != nil { + logger.Fatalw("Failed to create proxy handler", zap.Error(err)) + } + + serverManager, err := createServerManager(ctx, config, handler, logger, configMapWatcher) + if err != nil { + logger.Fatalw("Failed to create server manager", zap.Error(err)) + } + + if err := startServices(ctx, informers, configMapWatcher, logger); err != nil { + logger.Fatalw("Failed to start services", zap.Error(err)) + } + + logger.Info("Starting auth proxy servers...") + if err = serverManager.StartServers(ctx); err != nil { + logger.Fatalw("StartServers() returned an error", zap.Error(err)) + } + + logger.Info("Exiting...") +} + +// loadConfig loads and validates environment configuration +func loadConfig() (envConfig, error) { + var config envConfig + if err := envconfig.Process("", &config); err != nil { + return config, fmt.Errorf("failed to process environment variables: %w", err) + } + return config, nil +} + +// setupInformers initializes Kubernetes client and informers +func setupInformers(ctx context.Context) (context.Context, []controller.Informer) { + cfg := injection.ParseAndGetRESTConfigOrDie() + ctx = injection.WithConfig(ctx, cfg) + ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector) + + ctx, informers := injection.Default.SetupInformers(ctx, cfg) + return ctx, informers +} + +// setupLogging initializes logging configuration and returns the logger +func setupLogging(ctx context.Context, cmw *configmap.InformedWatcher) *zap.SugaredLogger { + loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName()) + if err != nil { + log.Fatal("Error loading/parsing logging configuration:", err) + } + + logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component) + ctx = logging.WithLogger(ctx, logger) + + cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component)) + + return logger +} + +// setupFeatureStore initializes feature flag store +func setupFeatureStore(_ context.Context, logger *zap.SugaredLogger, configMapWatcher *configmap.InformedWatcher) *feature.Store { + featureStore := feature.NewStore(logger.Named("feature-config-store")) + featureStore.WatchConfigs(configMapWatcher) + return featureStore +} + +// createProxyHandler creates and configures the proxy handler +func createProxyHandler(ctx context.Context, config envConfig, logger *zap.SugaredLogger, featureStore *feature.Store, configMapWatcher *configmap.InformedWatcher) (*ProxyHandler, error) { + var authSubjects []auth.SubjectsWithFilters + + if len(config.AuthPolicies) > 0 { + if err := json.Unmarshal([]byte(config.AuthPolicies), &authSubjects); err != nil { + return nil, fmt.Errorf("failed to parse policies: %w", err) + } + } + + handler := &ProxyHandler{ + kubeClient: kubeclient.Get(ctx), + authVerifier: auth.NewVerifier(ctx, nil, nil, configMapWatcher), + config: config, + authSubjects: authSubjects, + } + + handler.withContext = func(ctx context.Context) context.Context { + return logging.WithLogger(featureStore.ToContext(ctx), logger) + } + + httpProxy, err := httpReverseProxy(config) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP proxy: %w", err) + } + + httpsProxy, err := httpsReverseProxy(config) + if err != nil { + return nil, fmt.Errorf("failed to create HTTPS proxy: %w", err) + } + + handler.httpProxy = httpProxy + handler.httpsProxy = httpsProxy + + return handler, nil +} + +// createServerManager creates the TLS-enabled server manager +func createServerManager(ctx context.Context, config envConfig, handler *ProxyHandler, logger *zap.SugaredLogger, configMapWatcher *configmap.InformedWatcher) (*eventingtls.ServerManager, error) { + var tlsConfig *tls.Config + if handler.config.SinkTLSCertPath != nil && handler.config.SinkTLSKeyPath != nil { + var err error + tlsConfig, err = getServerTLSConfig(*handler.config.SinkTLSCertPath, *handler.config.SinkTLSKeyPath) + if err != nil { + return nil, fmt.Errorf("failed to get TLS config: %w", err) + } + logger.Info("TLS config loaded successfully") + } + + serverManager, err := eventingtls.NewServerManager(ctx, + kncloudevents.NewHTTPEventReceiver(config.ProxyHTTPPort), + kncloudevents.NewHTTPEventReceiver(config.ProxyHTTPSPort, + kncloudevents.WithTLSConfig(tlsConfig)), + handler, + configMapWatcher, + ) + if err != nil { + return nil, fmt.Errorf("failed to create server manager: %w", err) + } + + return serverManager, nil +} + +// startServices starts all background services (configmap watcher and informers) +func startServices(ctx context.Context, informers []controller.Informer, configMapWatcher *configmap.InformedWatcher, logger *zap.SugaredLogger) error { + logger.Debug("Starting ConfigMap watcher") + if err := configMapWatcher.Start(ctx.Done()); err != nil { + return fmt.Errorf("failed to start ConfigMap watcher: %w", err) + } + + logger.Info("Starting informers") + if err := controller.StartInformers(ctx.Done(), informers...); err != nil { + return fmt.Errorf("failed to start informers: %w", err) + } + + return nil +} + +func (h *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := h.withContext(r.Context()) + logger := logging.FromContext(ctx) + features := feature.FromContext(ctx) + + logger.Debugf("Handling request to %s", r.RequestURI) + + err := h.authVerifier.VerifyRequestFromSubjectsWithFilters(ctx, features, h.config.SinkAudience, h.authSubjects, h.config.SinkNamespace, r, w) + if err != nil { + logger.Debugw("Failed to verify AuthN and AuthZ", zap.Error(err)) + return + } + + if r.TLS == nil { + logger.Debug("Forwarding to HTTP target") + h.httpProxy.ServeHTTP(w, r) + } else { + logger.Debug("Forwarding to HTTPS target") + h.httpsProxy.ServeHTTP(w, r) + } +} + +// httpReverseProxy creates a reverse proxy for HTTP traffic to the target service +func httpReverseProxy(config envConfig) (*httputil.ReverseProxy, error) { + httpTarget := fmt.Sprintf("http://%s:%d", config.TargetHost, config.TargetHTTPPort) + + httpTargetURL, err := url.Parse(httpTarget) + if err != nil { + return nil, fmt.Errorf("failed to parse http target URL: %v", err) + } + + return httputil.NewSingleHostReverseProxy(httpTargetURL), nil +} + +// httpsReverseProxy creates a reverse proxy for HTTPS traffic with TLS configuration +func httpsReverseProxy(config envConfig) (*httputil.ReverseProxy, error) { + sinkUrl, err := apis.ParseURL(config.SinkURI) + if err != nil { + return nil, fmt.Errorf("failed to parse sink URL: %v", err) + } + + httpsTarget := fmt.Sprintf("https://%s:%d", config.TargetHost, config.TargetHTTPSPort) + + httpsTargetURL, err := url.Parse(httpsTarget) + if err != nil { + return nil, fmt.Errorf("failed to parse https target URL: %v", err) + } + + httpsProxy := httputil.NewSingleHostReverseProxy(httpsTargetURL) + httpsProxy.Director = func(req *http.Request) { + // in case of https requests, we need to rewrite the request URL/host, as otherwise, we get a certificate validation error + req.URL.Scheme = "https" + req.URL.Host = httpsTargetURL.Host + req.Host = sinkUrl.Host + } + + var caCerts *string + if config.SinkCACertsPath != nil { + caCertsB, err := os.ReadFile(*config.SinkCACertsPath) + if err != nil { + return nil, fmt.Errorf("failed to read CA certificates from %s: %w", *config.SinkCACertsPath, err) + } + caCerts = ptr.To(string(caCertsB)) + } + + var base = http.DefaultTransport.(*http.Transport).Clone() + clientConfig := eventingtls.ClientConfig{ + CACerts: caCerts, + TrustBundleConfigMapLister: nil, + } + + base.DialTLSContext = func(ctx context.Context, net, addr string) (net.Conn, error) { + tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig) + if err != nil { + return nil, err + } + tlsConfig.ServerName = sinkUrl.Host + + return network.DialTLSWithBackOff(ctx, net, fmt.Sprintf("%s:%d", config.TargetHost, config.TargetHTTPSPort), tlsConfig) + } + httpsProxy.Transport = base + + return httpsProxy, nil +} + +// getServerTLSConfig creates TLS configuration for the server using certificate files +func getServerTLSConfig(serverTLSCertificatePath, serverTLSCertificateKeyPath string) (*tls.Config, error) { + serverTLSConfig := eventingtls.NewDefaultServerConfig() + serverTLSConfig.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { + cert, err := tls.LoadX509KeyPair(serverTLSCertificatePath, serverTLSCertificateKeyPath) + if err != nil { + return nil, err + } + + return &cert, nil + } + return eventingtls.GetTLSServerConfig(serverTLSConfig) +} diff --git a/config/200-auth-proxy-role.yaml b/config/200-auth-proxy-role.yaml new file mode 120000 index 00000000000..5c5e00ede0b --- /dev/null +++ b/config/200-auth-proxy-role.yaml @@ -0,0 +1 @@ +core/roles/auth-proxy-role.yaml \ No newline at end of file diff --git a/config/core/roles/auth-proxy-role.yaml b/config/core/roles/auth-proxy-role.yaml new file mode 100644 index 00000000000..8263c4075eb --- /dev/null +++ b/config/core/roles/auth-proxy-role.yaml @@ -0,0 +1,39 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: knative-eventing-auth-proxy + namespace: knative-eventing + labels: + app.kubernetes.io/version: devel + app.kubernetes.io/name: knative-eventing +rules: + - apiGroups: + - "" + resources: + - "configmaps" + resourceNames: + - "config-logging" + - "config-features" + verbs: + - "get" + - apiGroups: + - "" + resources: + - "configmaps" + verbs: + - "list" + - "watch" \ No newline at end of file diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index a9f1be751a2..56879b6c42d 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -219,15 +219,15 @@ func resolveSubjectsFromReference(resolver *resolver.AuthenticatableResolver, re // SubjectAndFiltersPass checks if the given sub is contained in the list of allowedSubs // or if it matches a prefix pattern in subs (e.g. system:serviceaccounts:my-ns:*), as // well as if the event passes any filters associated with the subjects for an event policy -func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilters []subjectsWithFilters, event *cloudevents.Event, logger *zap.SugaredLogger) bool { +func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilters []SubjectsWithFilters, event *cloudevents.Event, logger *zap.SugaredLogger) bool { if event == nil { return false } for _, swf := range allowedSubsWithFilters { - for _, s := range swf.subjects { + for _, s := range swf.Subjects { if strings.EqualFold(s, sub) || (strings.HasSuffix(s, "*") && strings.HasPrefix(sub, strings.TrimSuffix(s, "*"))) { - return subscriptionsapi.CreateSubscriptionsAPIFilters(logger.Desugar(), swf.filters).Filter(ctx, *event) != eventfilter.FailFilter + return subscriptionsapi.CreateSubscriptionsAPIFilters(logger.Desugar(), swf.Filters).Filter(ctx, *event) != eventfilter.FailFilter } } } diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index c7396869b63..e4ca78e0804 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -700,24 +700,24 @@ func TestSubjectAndFiltersContained(t *testing.T) { tests := []struct { name string sub string - allowedSubsAndFilters []subjectsWithFilters + allowedSubsAndFilters []SubjectsWithFilters want bool }{ { name: "simple 1:1 match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{"system:serviceaccounts:my-ns:my-sa"}, + Subjects: []string{"system:serviceaccounts:my-ns:my-sa"}, }, }, want: true, }, { name: "simple 1:n match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns:another-sa", "system:serviceaccounts:my-ns:my-sa", "system:serviceaccounts:my-ns:yet-another-sa"}, @@ -727,9 +727,9 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (all)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "*"}, }, }, @@ -737,9 +737,9 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns:*", }, }, @@ -748,9 +748,9 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (different namespace)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns:*", }, }, @@ -759,9 +759,9 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace prefix)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns*", }, }, @@ -770,9 +770,9 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace prefix 2)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns*", }, }, @@ -781,9 +781,9 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (middle)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:*:my-sa", }, }, @@ -792,12 +792,12 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace prefix) and failing event filter", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns*", }, - filters: []eventingv1.SubscriptionsAPIFilter{ + Filters: []eventingv1.SubscriptionsAPIFilter{ { CESQL: "false", }, @@ -808,22 +808,22 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "only check filter if subject matches", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []subjectsWithFilters{ + allowedSubsAndFilters: []SubjectsWithFilters{ { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:not-my-ns*", }, - filters: []eventingv1.SubscriptionsAPIFilter{ + Filters: []eventingv1.SubscriptionsAPIFilter{ { CESQL: "true", }, }, }, { - subjects: []string{ + Subjects: []string{ "system:serviceaccounts:my-ns*", }, - filters: []eventingv1.SubscriptionsAPIFilter{ + Filters: []eventingv1.SubscriptionsAPIFilter{ { CESQL: "false", }, diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 1f7800bd5c9..672d7380086 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -136,6 +136,30 @@ func (v *Verifier) VerifyRequestFromSubject(ctx context.Context, features featur return nil } +// VerifyRequestFromSubjectsWithFilters verifies AuthN and AuthZ in the request. +// In the AuthZ part it checks if the request comes from the given allowedSubject. +// On verification errors, it sets the responses HTTP status and returns an error. +// This method is similar to VerifyRequestFromSubject() except that +// VerifyRequestFromSubjectsWithFilters() allows to check based on a list of +// subjects with filters. +func (v *Verifier) VerifyRequestFromSubjectsWithFilters(ctx context.Context, features feature.Flags, requiredOIDCAudience *string, allowedSubjectsWithFilters []SubjectsWithFilters, resourceNamespace string, req *http.Request, resp http.ResponseWriter) error { + if !features.IsOIDCAuthentication() { + return nil + } + + idToken, err := v.verifyAuthN(ctx, requiredOIDCAudience, req, resp) + if err != nil { + return fmt.Errorf("authentication of request could not be verified: %w", err) + } + + err = v.verifyAuthZBySubjectsWithFilters(ctx, features, idToken, resourceNamespace, allowedSubjectsWithFilters, req, resp) + if err != nil { + return fmt.Errorf("authorization of request could not be verified: %w", err) + } + + return nil +} + // verifyAuthN verifies if the incoming request contains a correct JWT token func (v *Verifier) verifyAuthN(ctx context.Context, audience *string, req *http.Request, resp http.ResponseWriter) (*IDToken, error) { token := GetJWTFromHeader(req.Header) @@ -160,7 +184,19 @@ func (v *Verifier) verifyAuthN(ctx context.Context, audience *string, req *http. // verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error { - if len(policyRefs) > 0 { + subjectsWithFiltersFromApplyingPolicies, err := SubjectWithFiltersFromPolicyRef(v.eventPolicyLister, resourceNamespace, policyRefs) + if err != nil { + resp.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("could not get subjects with filters from policy: %w", err) + } + + return v.verifyAuthZBySubjectsWithFilters(ctx, features, idToken, resourceNamespace, subjectsWithFiltersFromApplyingPolicies, req, resp) +} + +// verifyAuthZBySubjectsWithFilters verifies if the given idToken is allowed by the resources eventPolicyStatus +// it does the same as verifyAuthZ but taking a subjectWithFilters slice instead +func (v *Verifier) verifyAuthZBySubjectsWithFilters(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, subjectsWithFiltersFromApplyingPolicies []SubjectsWithFilters, req *http.Request, resp http.ResponseWriter) error { + if len(subjectsWithFiltersFromApplyingPolicies) > 0 { req, err := utils.CopyRequest(req) if err != nil { resp.WriteHeader(http.StatusInternalServerError) @@ -176,38 +212,26 @@ func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idTo return fmt.Errorf("failed to decode event from request: %w", err) } - subjectsWithFiltersFromApplyingPolicies := []subjectsWithFilters{} - for _, p := range policyRefs { - policy, err := v.eventPolicyLister.EventPolicies(resourceNamespace).Get(p.Name) - if err != nil { - resp.WriteHeader(http.StatusInternalServerError) - return fmt.Errorf("failed to get eventPolicy: %w", err) - } - - subjectsWithFiltersFromApplyingPolicies = append(subjectsWithFiltersFromApplyingPolicies, subjectsWithFilters{subjects: policy.Status.From, filters: policy.Spec.Filters}) - } - if !SubjectAndFiltersPass(ctx, idToken.Subject, subjectsWithFiltersFromApplyingPolicies, event, v.logger) { resp.WriteHeader(http.StatusForbidden) return fmt.Errorf("token is from subject %q, but only %#v are part of applying event policies", idToken.Subject, subjectsWithFiltersFromApplyingPolicies) } return nil - } else { - if features.IsAuthorizationDefaultModeDenyAll() { - resp.WriteHeader(http.StatusForbidden) - return fmt.Errorf("no event policies apply for resource and %s is set to %s", feature.AuthorizationDefaultMode, feature.AuthorizationDenyAll) - - } else if features.IsAuthorizationDefaultModeSameNamespace() { - if !strings.HasPrefix(idToken.Subject, fmt.Sprintf("%s:%s:", kubernetesServiceAccountPrefix, resourceNamespace)) { - resp.WriteHeader(http.StatusForbidden) - return fmt.Errorf("no policies apply for resource. %s is set to %s, but token is from subject %q, which is not part of %q namespace", feature.AuthorizationDefaultMode, feature.AuthorizationDenyAll, idToken.Subject, resourceNamespace) - } + } - return nil + if features.IsAuthorizationDefaultModeDenyAll() { + resp.WriteHeader(http.StatusForbidden) + return fmt.Errorf("no event policies apply for resource and %s is set to %s", feature.AuthorizationDefaultMode, feature.AuthorizationDenyAll) + } else if features.IsAuthorizationDefaultModeSameNamespace() { + if !strings.HasPrefix(idToken.Subject, fmt.Sprintf("%s:%s:", kubernetesServiceAccountPrefix, resourceNamespace)) { + resp.WriteHeader(http.StatusForbidden) + return fmt.Errorf("no policies apply for resource. %s is set to %s, but token is from subject %q, which is not part of %q namespace", feature.AuthorizationDefaultMode, feature.AuthorizationDenyAll, idToken.Subject, resourceNamespace) } - // else: allow all + + return nil } + // else: allow all return nil } @@ -343,7 +367,22 @@ type openIDMetadata struct { SigningAlgs []string `json:"id_token_signing_alg_values_supported"` } -type subjectsWithFilters struct { - filters []eventingv1.SubscriptionsAPIFilter - subjects []string +func SubjectWithFiltersFromPolicyRef(eventPolicyLister listerseventingv1alpha1.EventPolicyLister, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef) ([]SubjectsWithFilters, error) { + var subjectsWithFiltersFromApplyingPolicies []SubjectsWithFilters + + for _, p := range policyRefs { + policy, err := eventPolicyLister.EventPolicies(resourceNamespace).Get(p.Name) + if err != nil { + return nil, fmt.Errorf("failed to get eventPolicy: %w", err) + } + + subjectsWithFiltersFromApplyingPolicies = append(subjectsWithFiltersFromApplyingPolicies, SubjectsWithFilters{Subjects: policy.Status.From, Filters: policy.Spec.Filters}) + } + + return subjectsWithFiltersFromApplyingPolicies, nil +} + +type SubjectsWithFilters struct { + Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` + Subjects []string `json:"subjects,omitempty"` } diff --git a/pkg/eventingtls/eventingtls.go b/pkg/eventingtls/eventingtls.go index 718a744c197..6e5cd4de16c 100644 --- a/pkg/eventingtls/eventingtls.go +++ b/pkg/eventingtls/eventingtls.go @@ -36,6 +36,7 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/controller" "knative.dev/pkg/logging" ) @@ -195,6 +196,17 @@ func IsHttpsSink(sink string) bool { return strings.EqualFold(s.Scheme, "https") } +// GetHttpsAddress returns the (first) https address out of the list of addresses +func GetHttpsAddress(addresses []duckv1.Addressable) *duckv1.Addressable { + for _, address := range addresses { + if IsHttpsSink(address.URL.String()) { + return &address + } + } + + return nil +} + // certPool returns a x509.CertPool with the combined certs from: // - the system cert pool // - the knative trust bundle in TrustBundleMountPath From fa061e3c38a3d9668dead14c385a1c57d0b39df9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 10:45:02 +0200 Subject: [PATCH 2/9] Add auth-proxy to IntegrationSink --- config/core/deployments/controller.yaml | 4 + .../v1alpha1/integration_sink_lifecycle.go | 16 ++ pkg/reconciler/integration/sink/controller.go | 68 +++++- .../integration/sink/integrationsink.go | 121 +++++++++- .../integration/sink/integrationsink_test.go | 13 +- .../sink/resources/container_image.go | 209 +++++++++++++++++- .../integration/sink/resources/names.go | 6 +- 7 files changed, 407 insertions(+), 30 deletions(-) diff --git a/config/core/deployments/controller.yaml b/config/core/deployments/controller.yaml index 657e6db6cc3..599e148302b 100644 --- a/config/core/deployments/controller.yaml +++ b/config/core/deployments/controller.yaml @@ -72,6 +72,10 @@ spec: # APIServerSource - name: APISERVER_RA_IMAGE value: ko://knative.dev/eventing/cmd/apiserver_receive_adapter + + - name: AUTH_PROXY_IMAGE + value: ko://knative.dev/eventing/cmd/auth_proxy + - name: POD_NAME valueFrom: fieldRef: diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go index 7392cfa7a8d..076a5d5bdcf 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go @@ -43,12 +43,17 @@ const ( // Certificate related condition reasons IntegrationSinkCertificateNotReady string = "CertificateNotReady" + + // IntegrationSinkTrustBundlePropagated is configured to indicate whether the + // TLS trust bundle has been properly propagated. + IntegrationSinkTrustBundlePropagated apis.ConditionType = "TrustBundlePropagated" ) var IntegrationSinkCondSet = apis.NewLivingConditionSet( IntegrationSinkConditionAddressable, IntegrationSinkConditionDeploymentReady, IntegrationSinkConditionEventPoliciesReady, + IntegrationSinkTrustBundlePropagated, ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -160,3 +165,14 @@ func (s *IntegrationSinkStatus) SetAddress(address *duckv1.Addressable) { } } + +// MarkFailedTrustBundlePropagation marks the IntegrationSink's SinkBindingTrustBundlePropagated condition to False with +// the provided reason and message. +func (s *IntegrationSinkStatus) MarkFailedTrustBundlePropagation(reason, message string) { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkTrustBundlePropagated, reason, message) +} + +// MarkTrustBundlePropagated marks the IntegrationSink's SinkBindingTrustBundlePropagated condition to True. +func (s *IntegrationSinkStatus) MarkTrustBundlePropagated() { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkTrustBundlePropagated) +} diff --git a/pkg/reconciler/integration/sink/controller.go b/pkg/reconciler/integration/sink/controller.go index afabd918be2..bebb02ed1be 100644 --- a/pkg/reconciler/integration/sink/controller.go +++ b/pkg/reconciler/integration/sink/controller.go @@ -19,7 +19,14 @@ package sink import ( "context" + "github.com/kelseyhightower/envconfig" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing/pkg/eventingtls" + configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/system" "k8s.io/client-go/tools/cache" "knative.dev/pkg/injection" @@ -40,11 +47,18 @@ import ( integrationsinkreconciler "knative.dev/eventing/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink" kubeclient "knative.dev/pkg/client/injection/kube/client" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered" + rolebindinginformer "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" ) +// envConfig will be used to extract the required environment variables. +// If this configuration cannot be extracted, then NewController will panic. +type envConfig struct { + AuthProxyImage string `envconfig:"AUTH_PROXY_IMAGE" required:"true"` +} + func NewController( ctx context.Context, cmw configmap.Watcher, @@ -54,16 +68,27 @@ func NewController( eventPolicyInformer := eventpolicy.Get(ctx) deploymentInformer := deploymentinformer.Get(ctx) serviceInformer := service.Get(ctx) - + trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector) + rolebindingInformer := rolebindinginformer.Get(ctx) dynamicCertificateInformer := certificates.NewDynamicCertificatesInformer() + + env := &envConfig{} + if err := envconfig.Process("", env); err != nil { + logging.FromContext(ctx).Panicf("unable to process IntegrationSink's required environment variables: %v", err) + } + r := &Reconciler{ - secretLister: secretInformer.Lister(), - eventPolicyLister: eventPolicyInformer.Lister(), - kubeClientSet: kubeclient.Get(ctx), - deploymentLister: deploymentInformer.Lister(), - serviceLister: serviceInformer.Lister(), - cmCertificateLister: dynamicCertificateInformer.Lister(), - certManagerClient: certmanagerclientset.NewForConfigOrDie(injection.GetConfig(ctx)), + secretLister: secretInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), + kubeClientSet: kubeclient.Get(ctx), + deploymentLister: deploymentInformer.Lister(), + serviceLister: serviceInformer.Lister(), + cmCertificateLister: dynamicCertificateInformer.Lister(), + certManagerClient: certmanagerclientset.NewForConfigOrDie(injection.GetConfig(ctx)), + trustBundleConfigMapLister: trustBundleConfigMapInformer.Lister(), + integrationSinkLister: integrationSinkInformer.Lister(), + rolebindingLister: rolebindingInformer.Lister(), + authProxyImage: env.AuthProxyImage, } logging.FromContext(ctx).Info("Creating IntegrationSink controller") @@ -108,6 +133,11 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGVK(v1alpha1.SchemeGroupVersion.WithKind("IntegrationSink")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + integrationSinkGK := v1alpha1.SchemeGroupVersion.WithKind("IntegrationSink").GroupKind() eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler( integrationSinkInformer.Informer().GetIndexer(), @@ -115,5 +145,27 @@ func NewController( impl.EnqueueKey, )) + trustBundleConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(func(i interface{}) { + obj, err := kmeta.DeletionHandlingAccessor(i) + if err != nil { + return + } + if obj.GetNamespace() == system.Namespace() { + globalResync(i) + return + } + + sinks, err := integrationSinkInformer.Lister().IntegrationSinks(obj.GetNamespace()).List(labels.Everything()) + if err != nil { + return + } + for _, sink := range sinks { + impl.EnqueueKey(types.NamespacedName{ + Namespace: sink.Namespace, + Name: sink.Name, + }) + } + })) + return impl } diff --git a/pkg/reconciler/integration/sink/integrationsink.go b/pkg/reconciler/integration/sink/integrationsink.go index 37a93e65472..e0afb0495b9 100644 --- a/pkg/reconciler/integration/sink/integrationsink.go +++ b/pkg/reconciler/integration/sink/integrationsink.go @@ -21,7 +21,12 @@ import ( "fmt" "sync/atomic" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + rbacv1listers "k8s.io/client-go/listers/rbac/v1" "knative.dev/eventing/pkg/certificates" + sinksv1alpha1 "knative.dev/eventing/pkg/client/listers/sinks/v1alpha1" + "knative.dev/pkg/system" cmv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" @@ -78,6 +83,11 @@ type Reconciler struct { cmCertificateLister *atomic.Pointer[certmanagerlisters.CertificateLister] certManagerClient certmanagerclientset.Interface + + trustBundleConfigMapLister corev1listers.ConfigMapLister + integrationSinkLister sinksv1alpha1.IntegrationSinkLister + rolebindingLister rbacv1listers.RoleBindingLister + authProxyImage string } // newReconciledNormal makes a new reconciler event with event type Normal, and @@ -90,6 +100,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationS featureFlags := feature.FromContext(ctx) logger := logging.FromContext(ctx) + logger.Debugw("Reconciling Trust Bundles") + if err := r.reconcileIntegrationSinkTrustBundles(ctx, sink); err != nil { + logger.Errorw("Error reconciling Trust Bundles", zap.Error(err)) + return err + } + logger.Debugw("Reconciling IntegrationSink Certificate") _, err := r.reconcileIntegrationSinkCertificate(ctx, sink) if err != nil { @@ -97,8 +113,15 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationS return err } + logger.Debugw("Reconciling IntegrationSink auth-proxy RBAC") + _, err = r.reconcileAuthProxyRBAC(ctx, sink) + if err != nil { + logging.FromContext(ctx).Errorw("Error reconciling auth-proxy RBAC", zap.Error(err)) + return err + } + logger.Debugw("Reconciling IntegrationSink Deployment") - _, err = r.reconcileDeployment(ctx, sink, featureFlags) + _, err = r.reconcileDeployment(ctx, sink, r.authProxyImage, featureFlags) if err != nil { logging.FromContext(ctx).Errorw("Error reconciling Pod", zap.Error(err)) return err @@ -125,9 +148,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationS return newReconciledNormal(sink.Namespace, sink.Name) } -func (r *Reconciler) reconcileDeployment(ctx context.Context, sink *sinks.IntegrationSink, featureFlags feature.Flags) (*v1.Deployment, error) { +func (r *Reconciler) reconcileDeployment(ctx context.Context, sink *sinks.IntegrationSink, authProxyImage string, featureFlags feature.Flags) (*v1.Deployment, error) { + expected, err := resources.MakeDeploymentSpec(sink, authProxyImage, featureFlags, r.trustBundleConfigMapLister, r.eventPolicyLister) + if err != nil { + return nil, fmt.Errorf("failed to create deployment template: %w", err) + } - expected := resources.MakeDeploymentSpec(sink, featureFlags) deployment, err := r.deploymentLister.Deployments(sink.Namespace).Get(expected.Name) if apierrors.IsNotFound(err) { deployment, err = r.kubeClientSet.AppsV1().Deployments(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{}) @@ -169,6 +195,12 @@ func (r *Reconciler) reconcileService(ctx context.Context, sink *sinks.Integrati return nil, fmt.Errorf("getting Service : %v", err) } else if !metav1.IsControlledBy(svc, sink) { return nil, fmt.Errorf("Service %q is not owned by IntegrationSink %q", svc.Name, sink.Name) + } else if r.serviceChanged(svc.Spec, expected.Spec) { + svc.Spec = expected.Spec + svc, err = r.kubeClientSet.CoreV1().Services(sink.Namespace).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("updating Service: %v", err) + } } else { logging.FromContext(ctx).Debugw("Reusing existing Service", zap.Any("Service", svc)) } @@ -250,6 +282,22 @@ func (r *Reconciler) deleteIntegrationSinkCertificate(ctx context.Context, sink return nil } +func (r *Reconciler) reconcileIntegrationSinkTrustBundles(ctx context.Context, sink *sinks.IntegrationSink) error { + gvk := schema.GroupVersionKind{ + Group: sinks.SchemeGroupVersion.Group, + Version: sinks.SchemeGroupVersion.Version, + Kind: "IntegrationSink", + } + + if _, err := eventingtls.PropagateTrustBundles(ctx, r.kubeClientSet, r.trustBundleConfigMapLister, gvk, sink); err != nil { + sink.Status.MarkFailedTrustBundlePropagation("FailedTrustBundlePropagation", err.Error()) + return err + } + sink.Status.MarkTrustBundlePropagated() + + return nil +} + func (r *Reconciler) reconcileAddress(ctx context.Context, sink *sinks.IntegrationSink) error { featureFlags := feature.FromContext(ctx) @@ -284,6 +332,7 @@ func (r *Reconciler) reconcileAddress(ctx context.Context, sink *sinks.Integrati } else { httpAddress := r.httpAddress(sink) sink.Status.Address = &httpAddress + sink.Status.Addresses = []duckv1.Addressable{httpAddress} } if featureFlags.IsOIDCAuthentication() { @@ -333,6 +382,7 @@ func (r *Reconciler) httpAddress(sink *sinks.IntegrationSink) duckv1.Addressable func (r *Reconciler) httpsAddress(certs *string, sink *sinks.IntegrationSink) duckv1.Addressable { addr := r.httpAddress(sink) + addr.Name = ptr.To("https") addr.URL.Scheme = "https" addr.CACerts = certs return addr @@ -379,3 +429,68 @@ func (r *Reconciler) updateCertificate(ctx context.Context, sink *sinks.Integrat controller.GetEventRecorder(ctx).Event(sink, corev1.EventTypeNormal, certificateUpdated, expected.GetName()) return updated, nil } + +func (r *Reconciler) serviceChanged(existingSvc corev1.ServiceSpec, wantSvc corev1.ServiceSpec) bool { + return !equality.Semantic.DeepDerivative(wantSvc, existingSvc) +} + +func (r *Reconciler) reconcileAuthProxyRBAC(ctx context.Context, sink *sinks.IntegrationSink) (*rbacv1.RoleBinding, error) { + features := feature.FromContext(ctx) + + expected, err := resources.MakeAuthProxyRoleBindings(sink, r.integrationSinkLister, features) + if err != nil { + return nil, fmt.Errorf("creating auth proxy rolebinding: %w", err) + } + + if !features.IsOIDCAuthentication() { + return nil, r.deleteIntegrationSinkRBAC(ctx, expected) + } + + rolebinding, err := r.rolebindingLister.RoleBindings(expected.Namespace).Get(expected.Name) + if apierrors.IsNotFound(err) { + created, err := r.kubeClientSet.RbacV1().RoleBindings(system.Namespace()).Create(ctx, expected, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + return created, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get rolebinding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + + if !r.rolebindingChanged(rolebinding, expected) { + return rolebinding, nil + } + + expected.ResourceVersion = rolebinding.ResourceVersion + updated, err := r.kubeClientSet.RbacV1().RoleBindings(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + return updated, nil +} + +func (r *Reconciler) deleteIntegrationSinkRBAC(ctx context.Context, rolebinding *rbacv1.RoleBinding) error { + _, err := r.rolebindingLister.RoleBindings(rolebinding.Namespace).Get(rolebinding.Name) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("failed to get rolebinding %s/%s: %w", rolebinding.Namespace, rolebinding.Name, err) + } + + err = r.kubeClientSet.RbacV1().RoleBindings(rolebinding.Namespace).Delete(ctx, rolebinding.Name, metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("failed to delete auth-proxy rolebinding %s/%s: %w", rolebinding.Namespace, rolebinding.Name, err) + } + + return nil +} + +func (r *Reconciler) rolebindingChanged(existingRB *rbacv1.RoleBinding, wantRB *rbacv1.RoleBinding) bool { + return !equality.Semantic.DeepDerivative(wantRB, existingRB) +} diff --git a/pkg/reconciler/integration/sink/integrationsink_test.go b/pkg/reconciler/integration/sink/integrationsink_test.go index e4bb2cac4f5..3b6f0ed48ac 100644 --- a/pkg/reconciler/integration/sink/integrationsink_test.go +++ b/pkg/reconciler/integration/sink/integrationsink_test.go @@ -160,12 +160,13 @@ func TestReconcile(t *testing.T) { cmCertificatesListerAtomic.Store(&cmCertificatesLister) r := &Reconciler{ - kubeClientSet: fakekubeclient.Get(ctx), - deploymentLister: listers.GetDeploymentLister(), - serviceLister: listers.GetServiceLister(), - secretLister: listers.GetSecretLister(), - cmCertificateLister: cmCertificatesListerAtomic, - eventPolicyLister: listers.GetEventPolicyLister(), + kubeClientSet: fakekubeclient.Get(ctx), + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + secretLister: listers.GetSecretLister(), + cmCertificateLister: cmCertificatesListerAtomic, + eventPolicyLister: listers.GetEventPolicyLister(), + trustBundleConfigMapLister: listers.GetConfigMapLister(), } return integrationsink.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetIntegrationSinkLister(), controller.GetEventRecorder(ctx), r) diff --git a/pkg/reconciler/integration/sink/resources/container_image.go b/pkg/reconciler/integration/sink/resources/container_image.go index ff2e7b04c11..52b7528acca 100644 --- a/pkg/reconciler/integration/sink/resources/container_image.go +++ b/pkg/reconciler/integration/sink/resources/container_image.go @@ -17,23 +17,37 @@ limitations under the License. package resources import ( + "encoding/json" + "fmt" "os" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/utils/ptr" commonv1a1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/certificates" + alpha1 "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + v1alpha1listers "knative.dev/eventing/pkg/client/listers/sinks/v1alpha1" + "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/reconciler/integration" "knative.dev/pkg/kmeta" + "knative.dev/pkg/system" ) -func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) *appsv1.Deployment { - t := true +const ( + AuthProxyRolebindingName = "eventing-auth-proxy" +) +func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, authProxyImage string, featureFlags feature.Flags, trustBundleConfigmapLister corev1listers.ConfigMapLister, eventPolicyLister alpha1.EventPolicyLister) (*appsv1.Deployment, error) { deploy := &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -62,7 +76,7 @@ func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, featureFlags feature.Fla VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ SecretName: certificates.CertificateName(sink.Name), - Optional: &t, + Optional: ptr.To(true), }, }, }, @@ -98,7 +112,53 @@ func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, featureFlags feature.Fla }, } - return deploy + if featureFlags.IsOIDCAuthentication() { + // add auth-proxy + + proxyVars, err := makeAuthProxyEnv(sink, featureFlags, eventPolicyLister) + if err != nil { + return nil, fmt.Errorf("failed to make auth proxy env vars: %w", err) + } + + deploy.Spec.Template.Spec.Containers = append(deploy.Spec.Template.Spec.Containers, corev1.Container{ + Name: "auth-proxy", + Image: authProxyImage, + ImagePullPolicy: corev1.PullAlways, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 3128, + Protocol: corev1.ProtocolTCP, + Name: "http", + }, + { + ContainerPort: 3129, + Protocol: corev1.ProtocolTCP, + Name: "https", + }, + }, + Env: proxyVars, + VolumeMounts: []corev1.VolumeMount{ + { + // server certs, as the auth-proxy uses the same certs as the underlying sink + Name: certificates.CertificateName(sink.Name), + MountPath: "/etc/" + certificates.CertificateName(sink.Name), + ReadOnly: true, + }, + }, + }) + + // add trustbundles directly, so auth-proxies tokenverifier does not need the trustbundleconfigmap lister for oidc discovery + podspec, err := eventingtls.AddTrustBundleVolumes(trustBundleConfigmapLister, deploy, &deploy.Spec.Template.Spec) + if err != nil { + return nil, fmt.Errorf("failed to add trust bundle volumes: %w", err) + } + deploy.Spec.Template.Spec = *podspec + + // don't expose ports on sink container, as traffic should reach only auth-proxy + deploy.Spec.Template.Spec.Containers[0].Ports = nil + } + + return deploy, nil } func MakeService(sink *v1alpha1.IntegrationSink) *corev1.Service { @@ -119,22 +179,147 @@ func MakeService(sink *v1alpha1.IntegrationSink) *corev1.Service { Selector: integration.Labels(sink.Name), Ports: []corev1.ServicePort{ { - Name: "http", - Protocol: corev1.ProtocolTCP, - Port: 80, - TargetPort: intstr.IntOrString{IntVal: 8080}, + Name: "http", + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.String, + StrVal: "http", + }, }, { - Name: "https", - Protocol: corev1.ProtocolTCP, - Port: 443, - TargetPort: intstr.IntOrString{IntVal: 8443}, + Name: "https", + Protocol: corev1.ProtocolTCP, + Port: 443, + TargetPort: intstr.IntOrString{ + Type: intstr.String, + StrVal: "https", + }, }, }, }, } } +func MakeAuthProxyRoleBindings(sink *v1alpha1.IntegrationSink, sinkLister v1alpha1listers.IntegrationSinkLister, features feature.Flags) (*rbacv1.RoleBinding, error) { + rb := rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: AuthProxyRolebindingName, + Namespace: system.Namespace(), + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.GroupName, + Kind: "Role", + Name: "knative-eventing-auth-proxy", + }, + } + + // now we need to get the SA names for all the deployed IntegrationSink pods + sinks, err := sinkLister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("error listing sinks: %w", err) + } + sinks = append(sinks, sink) //add the current sink too, as this could not exist yet in the cluster + + serviceAccounts := map[types.NamespacedName]struct{}{} + for _, s := range sinks { + serviceAccounts[types.NamespacedName{ + Namespace: s.Namespace, + Name: "default", //TODO: get the real SA of the pod, as it could be that the integrationsink pod does not run under the default SA + }] = struct{}{} + } + + for sa := range serviceAccounts { + rb.Subjects = append(rb.Subjects, rbacv1.Subject{ + Kind: "ServiceAccount", + Namespace: sa.Namespace, + Name: sa.Name, + }) + } + + return &rb, nil +} + +func makeAuthProxyEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags, eventPolicyLister alpha1.EventPolicyLister) ([]corev1.EnvVar, error) { + sinkAddress := eventingtls.GetHttpsAddress(sink.Status.Addresses) + if sinkAddress == nil { + sinkAddress = sink.Status.Address + } + + policies, err := auth.SubjectWithFiltersFromPolicyRef(eventPolicyLister, sink.Namespace, sink.Status.Policies) + if err != nil { + return nil, fmt.Errorf("failed to build auth proxy policies env vars: %w", err) + } + + policiesJson, err := json.Marshal(policies) + if err != nil { + return nil, fmt.Errorf("failed to parse policies for auth proxy env vars: %w", err) + } + + envVars := []corev1.EnvVar{ + { + Name: "TARGET_HTTP_PORT", + Value: "8080", + }, + { + Name: "TARGET_HTTPS_PORT", + Value: "8443", + }, + { + Name: "PROXY_HTTP_PORT", + Value: "3128", + }, + { + Name: "PROXY_HTTPS_PORT", + Value: "3129", + }, + { + Name: "SYSTEM_NAMESPACE", + Value: system.Namespace(), + }, + { + Name: "AUTH_POLICIES", + Value: string(policiesJson), + }, + { + Name: "SINK_NAMESPACE", + Value: sink.Namespace, + }, + } + + if sinkAddress != nil && sinkAddress.URL != nil { + envVars = append(envVars, corev1.EnvVar{ + Name: "SINK_URI", + Value: sinkAddress.URL.String(), + }) + } + + if !featureFlags.IsDisabledTransportEncryption() { + envVars = append(envVars, []corev1.EnvVar{ + { + Name: "SINK_TLS_CERT_FILE", + Value: "/etc/" + certificates.CertificateName(sink.Name) + "/tls.crt", + }, + { + Name: "SINK_TLS_KEY_FILE", + Value: "/etc/" + certificates.CertificateName(sink.Name) + "/tls.key", + }, { + Name: "SINK_TLS_CA_FILE", + Value: "/etc/" + certificates.CertificateName(sink.Name) + "/ca.crt", + }, + }...) + } + + if sinkAddress != nil && sinkAddress.Audience != nil { + envVars = append(envVars, corev1.EnvVar{ + Name: "SINK_AUDIENCE", + Value: *sinkAddress.Audience, + }) + } + + return envVars, nil +} + func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev1.EnvVar { var envVars []corev1.EnvVar diff --git a/pkg/reconciler/integration/sink/resources/names.go b/pkg/reconciler/integration/sink/resources/names.go index bbb847a59c8..831211b109f 100644 --- a/pkg/reconciler/integration/sink/resources/names.go +++ b/pkg/reconciler/integration/sink/resources/names.go @@ -20,6 +20,10 @@ import ( "knative.dev/pkg/kmeta" ) +const ( + DeploymentSuffix = "-deployment" +) + func DeploymentName(sinkName string) string { - return kmeta.ChildName(sinkName, "-deployment") + return kmeta.ChildName(sinkName, DeploymentSuffix) } From 0e549d67e73c23430b439ac73f640152cece77dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 13:35:35 +0200 Subject: [PATCH 3/9] Add OIDC and AuthZ test for IntegrationSink --- .../authz/addressable_authz_conformance.go | 4 +- .../oidc/addressable_oidc_conformance.go | 2 +- test/rekt/integration_sink_test.go | 42 +++++++++++++++++++ .../integrationsink/integrationsink.go | 10 +++++ 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/test/rekt/features/authz/addressable_authz_conformance.go b/test/rekt/features/authz/addressable_authz_conformance.go index 643cad05f46..d08d8743383 100644 --- a/test/rekt/features/authz/addressable_authz_conformance.go +++ b/test/rekt/features/authz/addressable_authz_conformance.go @@ -105,7 +105,7 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n f.Alpha(kind). Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). - Must("get 202 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(202)).AtLeast(1)) + Must("get 202 or 204 on response", eventassert.OnStore(source).Match(eventassert.OneOf(eventassert.MatchStatusCode(202), eventassert.MatchStatusCode(204))).AtLeast(1)) return f } @@ -205,7 +205,7 @@ func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind f.Alpha(kind). Must("valid event sent", eventassert.OnStore(source1).MatchSentEvent(test.HasId(event1.ID())).Exact(1)). - Must("get 202 on response", eventassert.OnStore(source1).Match(eventassert.MatchStatusCode(202)).AtLeast(1)) + Must("get 202 or 204 on response", eventassert.OnStore(source1).Match(eventassert.OneOf(eventassert.MatchStatusCode(202), eventassert.MatchStatusCode(204))).AtLeast(1)) f.Alpha(kind). Must("invalid event sent", eventassert.OnStore(source2).MatchSentEvent(test.HasId(event2.ID())).Exact(1)). diff --git a/test/rekt/features/oidc/addressable_oidc_conformance.go b/test/rekt/features/oidc/addressable_oidc_conformance.go index bee8f85adb0..ce8849634c4 100644 --- a/test/rekt/features/oidc/addressable_oidc_conformance.go +++ b/test/rekt/features/oidc/addressable_oidc_conformance.go @@ -183,7 +183,7 @@ func addressableAllowsValidRequest(gvr schema.GroupVersionResource, kind, name s f.Alpha(kind). Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). - Must("get 202 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(202)).Exact(1)) + Must("get 202 or 204 on response", eventassert.OnStore(source).Match(eventassert.OneOf(eventassert.MatchStatusCode(202), eventassert.MatchStatusCode(204))).Exact(1)) return f } diff --git a/test/rekt/integration_sink_test.go b/test/rekt/integration_sink_test.go index 843ceb624aa..62ce9b56b3e 100644 --- a/test/rekt/integration_sink_test.go +++ b/test/rekt/integration_sink_test.go @@ -21,11 +21,16 @@ package rekt import ( "testing" + "time" + "knative.dev/eventing/test/rekt/features/authz" "knative.dev/eventing/test/rekt/features/integrationsink" + "knative.dev/eventing/test/rekt/features/oidc" + integrationsinkresource "knative.dev/eventing/test/rekt/resources/integrationsink" "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" ) @@ -58,3 +63,40 @@ func TestIntegrationSinkSuccessTLS(t *testing.T) { env.Test(ctx, t, integrationsink.SuccessTLS()) } + +func TestIntegrationSinkSupportsOIDC(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithObservabilityConfig, + k8s.WithEventListener, + environment.Managed(t), + environment.WithPollTimings(4*time.Second, 12*time.Minute), + eventshub.WithTLS(t), + ) + + name := feature.MakeRandomK8sName("integrationsink") + env.Prerequisite(ctx, t, integrationsinkresource.GoesReadySimple(name)) + + env.TestSet(ctx, t, oidc.AddressableOIDCConformance(integrationsinkresource.GVR(), "IntegrationSink", name, env.Namespace())) +} + +func TestIntegrationSinkSupportsAuthZ(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithObservabilityConfig, + k8s.WithEventListener, + eventshub.WithTLS(t), + environment.Managed(t), + ) + + name := feature.MakeRandomK8sName("integrationsink") + env.Prerequisite(ctx, t, integrationsinkresource.GoesReadySimple(name)) + + env.TestSet(ctx, t, authz.AddressableAuthZConformance(integrationsinkresource.GVR(), "IntegrationSink", name)) +} diff --git a/test/rekt/resources/integrationsink/integrationsink.go b/test/rekt/resources/integrationsink/integrationsink.go index 803a79a2765..604ee976130 100644 --- a/test/rekt/resources/integrationsink/integrationsink.go +++ b/test/rekt/resources/integrationsink/integrationsink.go @@ -66,3 +66,13 @@ func IsNotReady(name string, timing ...time.Duration) feature.StepFn { func IsAddressable(name string, timings ...time.Duration) feature.StepFn { return k8s.IsAddressable(GVR(), name, timings...) } + +func GoesReadySimple(name string) *feature.Feature { + f := feature.NewFeature() + + f.Setup("install integration sink", Install(name)) + f.Setup("integrationsink is ready", IsReady(name)) + f.Setup("integrationsink is addressable", IsAddressable(name)) + + return f +} From d0b5fad0c4b3296aba54189e21057a0f9aa4f1c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 13:37:44 +0200 Subject: [PATCH 4/9] Fix linting issue --- config/core/roles/auth-proxy-role.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/core/roles/auth-proxy-role.yaml b/config/core/roles/auth-proxy-role.yaml index 8263c4075eb..a9fcdbe5a8a 100644 --- a/config/core/roles/auth-proxy-role.yaml +++ b/config/core/roles/auth-proxy-role.yaml @@ -36,4 +36,4 @@ rules: - "configmaps" verbs: - "list" - - "watch" \ No newline at end of file + - "watch" From ebfa54225c4e800ea1ac28f1c88a4be29974ccad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 14:35:27 +0200 Subject: [PATCH 5/9] Fix unit tests --- .../v1alpha1/integration_sink_lifecycle.go | 13 +++++---- .../integration_sink_lifecycle_test.go | 9 ++++++ .../integration/sink/integrationsink_test.go | 28 +++++++++++++------ .../testing/v1alpha1/integrationsink.go | 10 +++++-- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go index 076a5d5bdcf..d95a44680ee 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go @@ -156,14 +156,17 @@ func (s *IntegrationSinkStatus) PropagateCertificateStatus(cs cmv1.CertificateSt return true } -func (s *IntegrationSinkStatus) SetAddress(address *duckv1.Addressable) { - s.Address = address - if address == nil || address.URL.IsEmpty() { +func (s *IntegrationSinkStatus) SetAddresses(addresses ...duckv1.Addressable) { + if len(addresses) == 0 || addresses[0].URL.IsEmpty() { IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionAddressable, "EmptyHostname", "hostname is the empty string") - } else { - IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionAddressable) + return + } + s.AddressStatus = duckv1.AddressStatus{ + Address: &addresses[0], + Addresses: addresses, } + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionAddressable) } // MarkFailedTrustBundlePropagation marks the IntegrationSink's SinkBindingTrustBundlePropagated condition to False with diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go index 627b3b13c6c..641901be8fd 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go @@ -55,6 +55,9 @@ func TestIntegrationSinkInitializeConditions(t *testing.T) { }, { Type: IntegrationSinkConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkTrustBundlePropagated, + Status: corev1.ConditionUnknown, }}, }, }, @@ -82,6 +85,9 @@ func TestIntegrationSinkInitializeConditions(t *testing.T) { }, { Type: IntegrationSinkConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkTrustBundlePropagated, + Status: corev1.ConditionUnknown, }}, }, }, @@ -109,6 +115,9 @@ func TestIntegrationSinkInitializeConditions(t *testing.T) { }, { Type: IntegrationSinkConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkTrustBundlePropagated, + Status: corev1.ConditionUnknown, }}, }, }, diff --git a/pkg/reconciler/integration/sink/integrationsink_test.go b/pkg/reconciler/integration/sink/integrationsink_test.go index 3b6f0ed48ac..ffbd3cd14f7 100644 --- a/pkg/reconciler/integration/sink/integrationsink_test.go +++ b/pkg/reconciler/integration/sink/integrationsink_test.go @@ -113,6 +113,7 @@ func TestReconcile(t *testing.T) { WithIntegrationSinkUID(sinkUID), WithIntegrationSinkSpec(makeIntegrationSinkSpec()), WithInitIntegrationSinkConditions, + WithIntegrationSinkTrustBundlePropagatedReady(), ), }}, WantCreates: []runtime.Object{ @@ -142,9 +143,10 @@ func TestReconcile(t *testing.T) { Object: NewIntegrationSink(sinkName, testNS, WithIntegrationSinkUID(sinkUID), WithIntegrationSinkAddressableReady(), - WithIntegrationSinkAddress(&sinkAddressable), + WithIntegrationSinkAddress(sinkAddressable), WithIntegrationSinkSpec(makeIntegrationSinkSpec()), WithIntegrationSinkEventPoliciesReadyBecauseOIDCDisabled(), + WithIntegrationSinkTrustBundlePropagatedReady(), WithInitIntegrationSinkConditions, WithIntegrationSinkPropagateDeploymenteStatus(makeDeploymentStatus(&conditionTrue)), ), @@ -167,6 +169,8 @@ func TestReconcile(t *testing.T) { cmCertificateLister: cmCertificatesListerAtomic, eventPolicyLister: listers.GetEventPolicyLister(), trustBundleConfigMapLister: listers.GetConfigMapLister(), + rolebindingLister: listers.GetRoleBindingLister(), + integrationSinkLister: listers.GetIntegrationSinkLister(), } return integrationsink.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetIntegrationSinkLister(), controller.GetEventRecorder(ctx), r) @@ -338,16 +342,22 @@ func makeService(name, namespace string) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: "http", - Protocol: corev1.ProtocolTCP, - Port: 80, - TargetPort: intstr.IntOrString{IntVal: 8080}, + Name: "http", + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.String, + StrVal: "http", + }, }, { - Name: "https", - Protocol: corev1.ProtocolTCP, - Port: 443, - TargetPort: intstr.IntOrString{IntVal: 8443}, + Name: "https", + Protocol: corev1.ProtocolTCP, + Port: 443, + TargetPort: intstr.IntOrString{ + Type: intstr.String, + StrVal: "https", + }, }, }, Selector: integration.Labels(sinkName), diff --git a/pkg/reconciler/testing/v1alpha1/integrationsink.go b/pkg/reconciler/testing/v1alpha1/integrationsink.go index eab21bd5132..c9936fb3779 100644 --- a/pkg/reconciler/testing/v1alpha1/integrationsink.go +++ b/pkg/reconciler/testing/v1alpha1/integrationsink.go @@ -69,9 +69,9 @@ func WithIntegrationSinkAddressableReady() IntegrationSinkOption { } } -func WithIntegrationSinkAddress(addr *duckv1.Addressable) IntegrationSinkOption { +func WithIntegrationSinkAddress(addr duckv1.Addressable) IntegrationSinkOption { return func(s *v1alpha1.IntegrationSink) { - s.Status.SetAddress(addr) + s.Status.SetAddresses(addr) } } @@ -86,3 +86,9 @@ func WithIntegrationSinkEventPoliciesReadyBecauseOIDCDisabled() IntegrationSinkO s.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) } } + +func WithIntegrationSinkTrustBundlePropagatedReady() IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.Status.MarkTrustBundlePropagated() + } +} From 65c9cec2b84e549f991ab2b90efff4b46b4a6f6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 14:36:15 +0200 Subject: [PATCH 6/9] Mark required fields in envConfig --- cmd/auth_proxy/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/auth_proxy/main.go b/cmd/auth_proxy/main.go index 2149066a2f9..9560f038269 100644 --- a/cmd/auth_proxy/main.go +++ b/cmd/auth_proxy/main.go @@ -59,19 +59,19 @@ const component = "auth-proxy" // envConfig holds all environment configuration for the auth proxy type envConfig struct { - TargetHost string `envconfig:"TARGET_HOST" default:"localhost"` + TargetHost string `envconfig:"TARGET_HOST" default:"localhost" required:"true"` TargetHTTPPort int `envconfig:"TARGET_HTTP_PORT" default:"8080"` TargetHTTPSPort int `envconfig:"TARGET_HTTPS_PORT" default:"8443"` ProxyHTTPPort int `envconfig:"PROXY_HTTP_PORT" default:"3128"` ProxyHTTPSPort int `envconfig:"PROXY_HTTPS_PORT" default:"3129"` - AuthPolicies string `envconfig:"AUTH_POLICIES" default:""` - SinkNamespace string `envconfig:"SINK_NAMESPACE"` + AuthPolicies string `envconfig:"AUTH_POLICIES"` + SinkNamespace string `envconfig:"SINK_NAMESPACE" required:"true"` SinkTLSCertPath *string `envconfig:"SINK_TLS_CERT_FILE"` SinkTLSKeyPath *string `envconfig:"SINK_TLS_KEY_FILE"` SinkCACertsPath *string `envconfig:"SINK_TLS_CA_FILE"` - SinkURI string `envconfig:"SINK_URI"` + SinkURI string `envconfig:"SINK_URI" required:"true"` SinkAudience *string `envconfig:"SINK_AUDIENCE"` } From 236d9c7d0046eced8471e2f6879aff76215c30f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 15:19:25 +0200 Subject: [PATCH 7/9] Reorder envConfig --- cmd/auth_proxy/main.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/auth_proxy/main.go b/cmd/auth_proxy/main.go index 9560f038269..348f2bc23ae 100644 --- a/cmd/auth_proxy/main.go +++ b/cmd/auth_proxy/main.go @@ -65,14 +65,15 @@ type envConfig struct { ProxyHTTPPort int `envconfig:"PROXY_HTTP_PORT" default:"3128"` ProxyHTTPSPort int `envconfig:"PROXY_HTTPS_PORT" default:"3129"` - AuthPolicies string `envconfig:"AUTH_POLICIES"` - SinkNamespace string `envconfig:"SINK_NAMESPACE" required:"true"` + SinkURI string `envconfig:"SINK_URI" required:"true"` + SinkNamespace string `envconfig:"SINK_NAMESPACE" required:"true"` + SinkAudience *string `envconfig:"SINK_AUDIENCE"` + + AuthPolicies string `envconfig:"AUTH_POLICIES"` + SinkTLSCertPath *string `envconfig:"SINK_TLS_CERT_FILE"` SinkTLSKeyPath *string `envconfig:"SINK_TLS_KEY_FILE"` SinkCACertsPath *string `envconfig:"SINK_TLS_CA_FILE"` - - SinkURI string `envconfig:"SINK_URI" required:"true"` - SinkAudience *string `envconfig:"SINK_AUDIENCE"` } // ProxyHandler handles HTTP requests and performs authentication/authorization From 0d63a21c8772acc18036fc8a382e6b08d762d44c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 16:21:07 +0200 Subject: [PATCH 8/9] Update cmd/auth_proxy/main.go Co-authored-by: Calum Murray --- cmd/auth_proxy/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/auth_proxy/main.go b/cmd/auth_proxy/main.go index 348f2bc23ae..7eed772fcea 100644 --- a/cmd/auth_proxy/main.go +++ b/cmd/auth_proxy/main.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2025 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 4409cbc1ebfb1ec81a66d027cef29be911592e84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 16:55:24 +0200 Subject: [PATCH 9/9] Address linter issues --- cmd/auth_proxy/main.go | 2 +- pkg/auth/verifier.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/auth_proxy/main.go b/cmd/auth_proxy/main.go index 7eed772fcea..7ca98bdafd8 100644 --- a/cmd/auth_proxy/main.go +++ b/cmd/auth_proxy/main.go @@ -100,6 +100,7 @@ func main() { configMapWatcher := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace()) logger := setupLogging(ctx, configMapWatcher) defer logger.Sync() + ctx = logging.WithLogger(ctx, logger) featureStore := setupFeatureStore(ctx, logger, configMapWatcher) @@ -152,7 +153,6 @@ func setupLogging(ctx context.Context, cmw *configmap.InformedWatcher) *zap.Suga } logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component) - ctx = logging.WithLogger(ctx, logger) cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component)) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 672d7380086..17231fbe8f1 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -368,7 +368,7 @@ type openIDMetadata struct { } func SubjectWithFiltersFromPolicyRef(eventPolicyLister listerseventingv1alpha1.EventPolicyLister, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef) ([]SubjectsWithFilters, error) { - var subjectsWithFiltersFromApplyingPolicies []SubjectsWithFilters + subjectsWithFiltersFromApplyingPolicies := make([]SubjectsWithFilters, 0, len(policyRefs)) for _, p := range policyRefs { policy, err := eventPolicyLister.EventPolicies(resourceNamespace).Get(p.Name)