From 0a23e98fc71366a29bba4034f2d99709304aaa1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 9 Sep 2025 16:11:40 +0200 Subject: [PATCH 1/3] Fix mt-broker-ingress auth to work with structured event too --- pkg/auth/verifier.go | 33 ++----------------- pkg/broker/ingress/ingress_handler.go | 10 +++++- pkg/utils/utils.go | 33 +++++++++++++++++++ .../authz/addressable_authz_conformance.go | 30 +++++++++-------- 4 files changed, 61 insertions(+), 45 deletions(-) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 1fc3ac1ae6f..1f7800bd5c9 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -17,7 +17,6 @@ limitations under the License. package auth import ( - "bytes" "context" "encoding/json" "fmt" @@ -32,6 +31,7 @@ import ( "go.opentelemetry.io/otel" corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/eventing/pkg/eventingtls" + "knative.dev/eventing/pkg/utils" "knative.dev/pkg/configmap" "knative.dev/pkg/network" "knative.dev/pkg/observability/tracing" @@ -161,7 +161,7 @@ func (v *Verifier) verifyAuthN(ctx context.Context, audience *string, req *http. // verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error { if len(policyRefs) > 0 { - req, err := copyRequest(req) + req, err := utils.CopyRequest(req) if err != nil { resp.WriteHeader(http.StatusInternalServerError) return fmt.Errorf("failed to copy request body: %w", err) @@ -335,35 +335,6 @@ func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *ht return openIdConfig, nil } -// copyRequest makes a copy of the http request which can be consumed as needed, leaving the original request -// able to be consumed as well. -func copyRequest(req *http.Request) (*http.Request, error) { - // check if we actually need to copy the body, otherwise we can return the original request - if req.Body == nil || req.Body == http.NoBody { - return req, nil - } - - var buf bytes.Buffer - if _, err := buf.ReadFrom(req.Body); err != nil { - return nil, fmt.Errorf("failed to read request body while copying it: %w", err) - } - - if err := req.Body.Close(); err != nil { - return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err) - } - - // set the original request body to be readable again - req.Body = io.NopCloser(&buf) - - // return a new request with a readable body and same headers as the original - // we don't need to set any other fields as cloudevents only uses the headers - // and body to construct the Message/Event. - return &http.Request{ - Header: req.Header, - Body: io.NopCloser(bytes.NewReader(buf.Bytes())), - }, nil -} - type openIDMetadata struct { Issuer string `json:"issuer"` JWKSURI string `json:"jwks_uri"` diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index e3fe42271fc..978d2752804 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -232,6 +232,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { return } + // copy the request, as we need access to the body (in case of a structured event) for the auth checks too + reqCp, err := utils.CopyRequest(request) + if err != nil { + h.Logger.Error("Failed to copy request", zap.Error(err)) + writer.WriteHeader(http.StatusInternalServerError) + return + } + ctx := h.withContext(request.Context()) ctx = observability.WithRequestLabels(ctx, request) @@ -277,7 +285,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if broker.Status.Address != nil { audience = broker.Status.Address.Audience } - err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) + err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, reqCp, writer) if err != nil { h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err)) return diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index b1a73c278c9..cfb8366227c 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -17,6 +17,10 @@ limitations under the License. package utils import ( + "bytes" + "fmt" + "io" + "net/http" "regexp" "strings" @@ -91,3 +95,32 @@ func GenerateFixedName(owner metav1.Object, prefix string) string { // A dot must be followed by [a-z0-9] to be DNS1123 compliant. Make sure we are not joining a dot and a dash. return strings.TrimSuffix(prefix, ".") + uid } + +// CopyRequest makes a copy of the http request which can be consumed as needed, leaving the original request +// able to be consumed as well. +func CopyRequest(req *http.Request) (*http.Request, error) { + // check if we actually need to copy the body, otherwise we can return the original request + if req.Body == nil || req.Body == http.NoBody { + return req, nil + } + + var buf bytes.Buffer + if _, err := buf.ReadFrom(req.Body); err != nil { + return nil, fmt.Errorf("failed to read request body while copying it: %w", err) + } + + if err := req.Body.Close(); err != nil { + return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err) + } + + // set the original request body to be readable again + req.Body = io.NopCloser(&buf) + + // return a new request with a readable body and same headers as the original + // we don't need to set any other fields as cloudevents only uses the headers + // and body to construct the Message/Event. + return &http.Request{ + Header: req.Header, + Body: io.NopCloser(bytes.NewReader(buf.Bytes())), + }, nil +} diff --git a/test/rekt/features/authz/addressable_authz_conformance.go b/test/rekt/features/authz/addressable_authz_conformance.go index 7043e86b082..643cad05f46 100644 --- a/test/rekt/features/authz/addressable_authz_conformance.go +++ b/test/rekt/features/authz/addressable_authz_conformance.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + cloudevents "github.com/cloudevents/sdk-go/v2" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/eventpolicy" "knative.dev/eventing/test/rekt/resources/pingsource" @@ -41,7 +42,8 @@ func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name str fs := feature.FeatureSet{ Name: fmt.Sprintf("%s handles authorization features correctly", kind), Features: []*feature.Feature{ - addressableRespectsEventPolicyFilters(gvr, kind, name), + addressableRespectsEventPolicyFilters(gvr, kind, name, cloudevents.EncodingBinary), + addressableRespectsEventPolicyFilters(gvr, kind, name, cloudevents.EncodingStructured), }, } @@ -57,16 +59,18 @@ func AddressableAuthZConformanceRequestHandling(gvr schema.GroupVersionResource, fs := feature.FeatureSet{ Name: fmt.Sprintf("%s handles authorization in requests correctly", kind), Features: []*feature.Feature{ - addressableAllowsAuthorizedRequest(gvr, kind, name), - addressableRejectsUnauthorizedRequest(gvr, kind, name), + addressableAllowsAuthorizedRequest(gvr, kind, name, cloudevents.EncodingBinary), + addressableAllowsAuthorizedRequest(gvr, kind, name, cloudevents.EncodingStructured), + addressableRejectsUnauthorizedRequest(gvr, kind, name, cloudevents.EncodingBinary), + addressableRejectsUnauthorizedRequest(gvr, kind, name, cloudevents.EncodingStructured), addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name), }, } return &fs } -func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { - f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request", kind)) +func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request with %s encoding for input event", kind, inputEventEncoding)) f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) @@ -95,7 +99,7 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n f.Requirement("install source", eventshub.Install( source, eventshub.StartSenderToResourceTLS(gvr, name, nil), - eventshub.InputEvent(event), + eventshub.InputEventWithEncoding(event, inputEventEncoding), eventshub.OIDCSubject(sourceSubject), )) @@ -106,8 +110,8 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n return f } -func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { - f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request", kind)) +func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request with %s encoding for input event", kind, inputEventEncoding)) f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) @@ -132,7 +136,7 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind f.Requirement("install source", eventshub.Install( source, eventshub.StartSenderToResourceTLS(gvr, name, nil), - eventshub.InputEvent(event), + eventshub.InputEventWithEncoding(event, inputEventEncoding), eventshub.InitialSenderDelay(10*time.Second), )) @@ -143,8 +147,8 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind return f } -func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { - f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter", kind)) +func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter with %s encoding for input event", kind, inputEventEncoding)) f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) @@ -188,14 +192,14 @@ func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind f.Requirement("install source 1", eventshub.Install( source1, eventshub.StartSenderToResourceTLS(gvr, name, nil), - eventshub.InputEvent(event1), + eventshub.InputEventWithEncoding(event1, inputEventEncoding), eventshub.OIDCSubject(sourceSubject1), )) f.Requirement("install source 2", eventshub.Install( source2, eventshub.StartSenderToResourceTLS(gvr, name, nil), - eventshub.InputEvent(event2), + eventshub.InputEventWithEncoding(event2, inputEventEncoding), eventshub.OIDCSubject(sourceSubject2), )) From 51338edeb1db945c77e235829d32c303f721cc1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 10 Sep 2025 08:59:15 +0200 Subject: [PATCH 2/3] Fix channel receiver auth to work with structured event too --- pkg/channel/event_receiver.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index 2e4864b28ac..8c409716f6d 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -282,6 +282,14 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth ctx = observability.WithChannelLabels(ctx, types.NamespacedName{Name: channel.Name, Namespace: channel.Namespace}) ctx = observability.WithRequestLabels(ctx, request) + // copy the request, as we need access to the body (in case of a structured event) for the auth checks too + reqCopy, err := utils.CopyRequest(request) + if err != nil { + r.logger.Error("Failed to copy request", zap.Error(err)) + response.WriteHeader(nethttp.StatusInternalServerError) + return + } + event, err := http.NewEventFromHTTPRequest(request) if err != nil { r.logger.Warn("failed to extract event from request", zap.Error(err)) @@ -316,7 +324,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth return } - err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, request, response) + err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, reqCopy, response) if err != nil { r.logger.Warn("could not verify authn and authz of request", zap.Error(err)) return From f8211ac607e837baccfa5bd25bb52afbcd286637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 10 Sep 2025 09:08:25 +0200 Subject: [PATCH 3/3] Add unit test --- pkg/utils/utils_test.go | 45 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index ecbeb8c3f82..8be28e0cc2e 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -17,7 +17,10 @@ limitations under the License. package utils import ( + "bytes" "fmt" + "io" + "net/http" "strings" "testing" @@ -169,3 +172,45 @@ func TestToDNS1123Subdomain(t *testing.T) { }) } } + +func TestCopyRequest(t *testing.T) { + const ( + contentType = "application/json" + authorization = "Bearer token" + ) + + originalRequest := &http.Request{ + Header: map[string][]string{ + "Content-Type": {contentType}, + "Authorization": {authorization}, + }, + Body: io.NopCloser(strings.NewReader("test content")), + } + + copiedReq, err := CopyRequest(originalRequest) + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if copiedReq.Header.Get("Content-Type") != contentType { + t.Error("Header not copied correctly") + } + if copiedReq.Header.Get("Authorization") != authorization { + t.Error("Authorization header not copied correctly") + } + + originalBody, err := io.ReadAll(originalRequest.Body) + if err != nil { + t.Fatalf("Failed to read original body: %v", err) + } + + copiedBody, err := io.ReadAll(copiedReq.Body) + if err != nil { + t.Fatalf("Failed to read copied body: %v", err) + } + + if !bytes.Equal(originalBody, copiedBody) { + t.Errorf("Body not copied correctly. Original: %s, Copied: %s", originalBody, copiedBody) + } +}