Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 2 additions & 31 deletions pkg/auth/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package auth

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -32,6 +31,7 @@ import (
"go.opentelemetry.io/otel"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/configmap"
"knative.dev/pkg/network"
"knative.dev/pkg/observability/tracing"
Expand Down Expand Up @@ -161,7 +161,7 @@ func (v *Verifier) verifyAuthN(ctx context.Context, audience *string, req *http.
// verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus
func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error {
if len(policyRefs) > 0 {
req, err := copyRequest(req)
req, err := utils.CopyRequest(req)
if err != nil {
resp.WriteHeader(http.StatusInternalServerError)
return fmt.Errorf("failed to copy request body: %w", err)
Expand Down Expand Up @@ -335,35 +335,6 @@ func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *ht
return openIdConfig, nil
}

// copyRequest makes a copy of the http request which can be consumed as needed, leaving the original request
// able to be consumed as well.
func copyRequest(req *http.Request) (*http.Request, error) {
// check if we actually need to copy the body, otherwise we can return the original request
if req.Body == nil || req.Body == http.NoBody {
return req, nil
}

var buf bytes.Buffer
if _, err := buf.ReadFrom(req.Body); err != nil {
return nil, fmt.Errorf("failed to read request body while copying it: %w", err)
}

if err := req.Body.Close(); err != nil {
return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err)
}

// set the original request body to be readable again
req.Body = io.NopCloser(&buf)

// return a new request with a readable body and same headers as the original
// we don't need to set any other fields as cloudevents only uses the headers
// and body to construct the Message/Event.
return &http.Request{
Header: req.Header,
Body: io.NopCloser(bytes.NewReader(buf.Bytes())),
}, nil
}

type openIDMetadata struct {
Issuer string `json:"issuer"`
JWKSURI string `json:"jwks_uri"`
Expand Down
10 changes: 9 additions & 1 deletion pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
return
}

// copy the request, as we need access to the body (in case of a structured event) for the auth checks too
reqCp, err := utils.CopyRequest(request)
if err != nil {
h.Logger.Error("Failed to copy request", zap.Error(err))
writer.WriteHeader(http.StatusInternalServerError)
return
}

ctx := h.withContext(request.Context())
ctx = observability.WithRequestLabels(ctx, request)

Expand Down Expand Up @@ -277,7 +285,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if broker.Status.Address != nil {
audience = broker.Status.Address.Audience
}
err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer)
err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, reqCp, writer)
if err != nil {
h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err))
return
Expand Down
10 changes: 9 additions & 1 deletion pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
ctx = observability.WithChannelLabels(ctx, types.NamespacedName{Name: channel.Name, Namespace: channel.Namespace})
ctx = observability.WithRequestLabels(ctx, request)

// copy the request, as we need access to the body (in case of a structured event) for the auth checks too
reqCopy, err := utils.CopyRequest(request)
if err != nil {
r.logger.Error("Failed to copy request", zap.Error(err))
response.WriteHeader(nethttp.StatusInternalServerError)
return
}

event, err := http.NewEventFromHTTPRequest(request)
if err != nil {
r.logger.Warn("failed to extract event from request", zap.Error(err))
Expand Down Expand Up @@ -316,7 +324,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
return
}

err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, request, response)
err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, reqCopy, response)
if err != nil {
r.logger.Warn("could not verify authn and authz of request", zap.Error(err))
return
Expand Down
33 changes: 33 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package utils

import (
"bytes"
"fmt"
"io"
"net/http"
"regexp"
"strings"

Expand Down Expand Up @@ -91,3 +95,32 @@ func GenerateFixedName(owner metav1.Object, prefix string) string {
// A dot must be followed by [a-z0-9] to be DNS1123 compliant. Make sure we are not joining a dot and a dash.
return strings.TrimSuffix(prefix, ".") + uid
}

// CopyRequest makes a copy of the http request which can be consumed as needed, leaving the original request
// able to be consumed as well.
func CopyRequest(req *http.Request) (*http.Request, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanna add a test to utils_test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 f8211ac

// check if we actually need to copy the body, otherwise we can return the original request
if req.Body == nil || req.Body == http.NoBody {
return req, nil
}

var buf bytes.Buffer
if _, err := buf.ReadFrom(req.Body); err != nil {
return nil, fmt.Errorf("failed to read request body while copying it: %w", err)
}

if err := req.Body.Close(); err != nil {
return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err)
}

// set the original request body to be readable again
req.Body = io.NopCloser(&buf)

// return a new request with a readable body and same headers as the original
// we don't need to set any other fields as cloudevents only uses the headers
// and body to construct the Message/Event.
return &http.Request{
Header: req.Header,
Body: io.NopCloser(bytes.NewReader(buf.Bytes())),
}, nil
}
45 changes: 45 additions & 0 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package utils

import (
"bytes"
"fmt"
"io"
"net/http"
"strings"
"testing"

Expand Down Expand Up @@ -169,3 +172,45 @@ func TestToDNS1123Subdomain(t *testing.T) {
})
}
}

func TestCopyRequest(t *testing.T) {
const (
contentType = "application/json"
authorization = "Bearer token"
)

originalRequest := &http.Request{
Header: map[string][]string{
"Content-Type": {contentType},
"Authorization": {authorization},
},
Body: io.NopCloser(strings.NewReader("test content")),
}

copiedReq, err := CopyRequest(originalRequest)

if err != nil {
t.Errorf("Unexpected error: %v", err)
}

if copiedReq.Header.Get("Content-Type") != contentType {
t.Error("Header not copied correctly")
}
if copiedReq.Header.Get("Authorization") != authorization {
t.Error("Authorization header not copied correctly")
}

originalBody, err := io.ReadAll(originalRequest.Body)
if err != nil {
t.Fatalf("Failed to read original body: %v", err)
}

copiedBody, err := io.ReadAll(copiedReq.Body)
if err != nil {
t.Fatalf("Failed to read copied body: %v", err)
}

if !bytes.Equal(originalBody, copiedBody) {
t.Errorf("Body not copied correctly. Original: %s, Copied: %s", originalBody, copiedBody)
}
}
30 changes: 17 additions & 13 deletions test/rekt/features/authz/addressable_authz_conformance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/test/rekt/resources/eventpolicy"
"knative.dev/eventing/test/rekt/resources/pingsource"
Expand All @@ -41,7 +42,8 @@ func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name str
fs := feature.FeatureSet{
Name: fmt.Sprintf("%s handles authorization features correctly", kind),
Features: []*feature.Feature{
addressableRespectsEventPolicyFilters(gvr, kind, name),
addressableRespectsEventPolicyFilters(gvr, kind, name, cloudevents.EncodingBinary),
addressableRespectsEventPolicyFilters(gvr, kind, name, cloudevents.EncodingStructured),
},
}

Expand All @@ -57,16 +59,18 @@ func AddressableAuthZConformanceRequestHandling(gvr schema.GroupVersionResource,
fs := feature.FeatureSet{
Name: fmt.Sprintf("%s handles authorization in requests correctly", kind),
Features: []*feature.Feature{
addressableAllowsAuthorizedRequest(gvr, kind, name),
addressableRejectsUnauthorizedRequest(gvr, kind, name),
addressableAllowsAuthorizedRequest(gvr, kind, name, cloudevents.EncodingBinary),
addressableAllowsAuthorizedRequest(gvr, kind, name, cloudevents.EncodingStructured),
addressableRejectsUnauthorizedRequest(gvr, kind, name, cloudevents.EncodingBinary),
addressableRejectsUnauthorizedRequest(gvr, kind, name, cloudevents.EncodingStructured),
addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name),
},
}
return &fs
}

func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request", kind))
func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request with %s encoding for input event", kind, inputEventEncoding))

f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled())
f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
Expand Down Expand Up @@ -95,7 +99,7 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n
f.Requirement("install source", eventshub.Install(
source,
eventshub.StartSenderToResourceTLS(gvr, name, nil),
eventshub.InputEvent(event),
eventshub.InputEventWithEncoding(event, inputEventEncoding),
eventshub.OIDCSubject(sourceSubject),
))

Expand All @@ -106,8 +110,8 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n
return f
}

func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request", kind))
func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request with %s encoding for input event", kind, inputEventEncoding))

f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled())
f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
Expand All @@ -132,7 +136,7 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind
f.Requirement("install source", eventshub.Install(
source,
eventshub.StartSenderToResourceTLS(gvr, name, nil),
eventshub.InputEvent(event),
eventshub.InputEventWithEncoding(event, inputEventEncoding),
eventshub.InitialSenderDelay(10*time.Second),
))

Expand All @@ -143,8 +147,8 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind
return f
}

func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter", kind))
func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter with %s encoding for input event", kind, inputEventEncoding))

f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled())
f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
Expand Down Expand Up @@ -188,14 +192,14 @@ func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind
f.Requirement("install source 1", eventshub.Install(
source1,
eventshub.StartSenderToResourceTLS(gvr, name, nil),
eventshub.InputEvent(event1),
eventshub.InputEventWithEncoding(event1, inputEventEncoding),
eventshub.OIDCSubject(sourceSubject1),
))

f.Requirement("install source 2", eventshub.Install(
source2,
eventshub.StartSenderToResourceTLS(gvr, name, nil),
eventshub.InputEvent(event2),
eventshub.InputEventWithEncoding(event2, inputEventEncoding),
eventshub.OIDCSubject(sourceSubject2),
))

Expand Down
Loading