From d20cc6c0b662e954c7731c57d8e81b365844f38e Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 14 Jun 2021 15:12:24 +0200 Subject: [PATCH 1/5] Glue code + e2e test with channel Signed-off-by: Francesco Guardiani --- pkg/kncloudevents/retries.go | 8 ++ pkg/kncloudevents/retries_test.go | 13 +- test/experimental/config/features.yaml | 1 + test/experimental/delivery_timeout_test.go | 43 +++++++ .../features/delivery_timeout/channel.go | 116 ++++++++++++++++++ 5 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 test/experimental/delivery_timeout_test.go create mode 100644 test/experimental/features/delivery_timeout/channel.go diff --git a/pkg/kncloudevents/retries.go b/pkg/kncloudevents/retries.go index 416049b46d7..ee816efa16a 100644 --- a/pkg/kncloudevents/retries.go +++ b/pkg/kncloudevents/retries.go @@ -104,6 +104,14 @@ func RetryConfigFromDeliverySpec(spec v1.DeliverySpec) (RetryConfig, error) { } } + if spec.Timeout != nil { + timeout, err := period.Parse(*spec.Timeout) + if err != nil { + return retryConfig, fmt.Errorf("failed to parse Spec.Timeout: %w", err) + } + retryConfig.RequestTimeout, _ = timeout.Duration() + } + return retryConfig, nil } diff --git a/pkg/kncloudevents/retries_test.go b/pkg/kncloudevents/retries_test.go index 20009d957df..6c2e493829c 100644 --- a/pkg/kncloudevents/retries_test.go +++ b/pkg/kncloudevents/retries_test.go @@ -339,6 +339,7 @@ func TestRetryConfigFromDeliverySpecCheckRetry(t *testing.T) { Retry: pointer.Int32Ptr(10), BackoffPolicy: &linear, BackoffDelay: pointer.StringPtr("PT1S"), + Timeout: pointer.StringPtr("PT10S"), }, }, { name: "only retry", @@ -347,15 +348,21 @@ func TestRetryConfigFromDeliverySpecCheckRetry(t *testing.T) { BackoffPolicy: &linear, }, }, { - name: "not ISO8601", + name: "delay not ISO8601", spec: v1.DeliverySpec{ Retry: pointer.Int32Ptr(10), BackoffDelay: pointer.StringPtr("PP1"), BackoffPolicy: &linear, }, wantErr: true, - }, - } + }, { + name: "timeout not ISO8601", + spec: v1.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + Timeout: pointer.StringPtr("PP1"), + }, + wantErr: true, + }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/test/experimental/config/features.yaml b/test/experimental/config/features.yaml index 1138d90b544..d8b990c8cd7 100644 --- a/test/experimental/config/features.yaml +++ b/test/experimental/config/features.yaml @@ -23,3 +23,4 @@ metadata: knative.dev/config-category: eventing data: kreference-group: "enabled" + delivery-timeout: "enabled" diff --git a/test/experimental/delivery_timeout_test.go b/test/experimental/delivery_timeout_test.go new file mode 100644 index 00000000000..09075ebd4eb --- /dev/null +++ b/test/experimental/delivery_timeout_test.go @@ -0,0 +1,43 @@ +// +build e2e + +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package experimental + +import ( + "testing" + + "knative.dev/eventing/test/experimental/features/delivery_timeout" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestChannelToChannel(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, delivery_timeout.ChannelToSink()) +} diff --git a/test/experimental/features/delivery_timeout/channel.go b/test/experimental/features/delivery_timeout/channel.go new file mode 100644 index 00000000000..4d50190f462 --- /dev/null +++ b/test/experimental/features/delivery_timeout/channel.go @@ -0,0 +1,116 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_timeout + +import ( + "context" + "time" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" +) + +// ChannelToSink tests a scenario where the flow is source -> channel -> sink (timeout) -- fallback to -> dead letter sink +func ChannelToSink() *feature.Feature { + f := feature.NewFeature() + + timeout := 5 * time.Second + timeoutString := "PT5S" + + channelAPIVersion, imcKind := channel.GVK().ToAPIVersionAndKind() + + channelName := feature.MakeRandomK8sName("channel") + subName := feature.MakeRandomK8sName("sub-sink") + sinkName := feature.MakeRandomK8sName("sink") + deadLetterSinkName := feature.MakeRandomK8sName("dead-letter") + sourceName := feature.MakeRandomK8sName("source") + + ev := cetest.FullEvent() + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + eventshub.ResponseWaitTime(timeout), + )) + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + + f.Setup("Install channel", channel.Install(channelName)) + f.Setup("Channel is ready", channel.IsReady(channelName)) + + f.Setup("Install channel -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: duckv1.KReference{ + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: channelName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: sinkName, + }, + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: channelName, + }, + }, + Timeout: &timeoutString, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + f.Setup("subscription channel -> Sink is ready", subscription.IsReady(subName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(channel.GVR(), channelName), + eventshub.InputEvent(ev), + )) + + f.Assert("receive event on sink", assert.OnStore(sinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + f.Assert("receive event on dead letter sink", assert.OnStore(deadLetterSinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + + return f +} From 673cc4775d0f16b9ee6423bc92df7412c65a73be Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 22 Jun 2021 14:19:35 +0200 Subject: [PATCH 2/5] Test delivery timeout Signed-off-by: Francesco Guardiani --- test/experimental/delivery_timeout_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/experimental/delivery_timeout_test.go b/test/experimental/delivery_timeout_test.go index 09075ebd4eb..8a2d1c57187 100644 --- a/test/experimental/delivery_timeout_test.go +++ b/test/experimental/delivery_timeout_test.go @@ -28,7 +28,7 @@ import ( "knative.dev/reconciler-test/pkg/knative" ) -func TestChannelToChannel(t *testing.T) { +func TestDeliveryTimeout(t *testing.T) { t.Parallel() ctx, env := global.Environment( From 7c7aff7dea2716160ba2a0495c4b497830d56a71 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 23 Jun 2021 09:11:55 +0200 Subject: [PATCH 3/5] Some more glue code which was missing Signed-off-by: Francesco Guardiani --- .../resources/in-memory-channel.yaml | 1 + config/core/resources/channel.yaml | 1 + pkg/channel/fanout/fanout_message_handler.go | 1 - pkg/reconciler/subscription/subscription.go | 6 ++++-- .../features/delivery_timeout/channel.go | 17 ++++++++++------- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/config/channels/in-memory-channel/resources/in-memory-channel.yaml b/config/channels/in-memory-channel/resources/in-memory-channel.yaml index 452c1a90d75..1e31bd0b198 100644 --- a/config/channels/in-memory-channel/resources/in-memory-channel.yaml +++ b/config/channels/in-memory-channel/resources/in-memory-channel.yaml @@ -118,6 +118,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature generation: description: Generation of the origin of the subscriber with uid:UID. type: integer diff --git a/config/core/resources/channel.yaml b/config/core/resources/channel.yaml index 25b2e18dd78..af69a0c0bc3 100644 --- a/config/core/resources/channel.yaml +++ b/config/core/resources/channel.yaml @@ -146,6 +146,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature generation: description: Generation of the origin of the subscriber with uid:UID. type: integer diff --git a/pkg/channel/fanout/fanout_message_handler.go b/pkg/channel/fanout/fanout_message_handler.go index fcb1409837f..e58e0d43570 100644 --- a/pkg/channel/fanout/fanout_message_handler.go +++ b/pkg/channel/fanout/fanout_message_handler.go @@ -33,7 +33,6 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/buffering" "go.opencensus.io/trace" "go.uber.org/zap" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index ea7ad69dc0b..08ffd5a2d69 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -510,13 +510,14 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de }, } } - if channel.Spec.Delivery.BackoffDelay != nil || channel.Spec.Delivery.Retry != nil || channel.Spec.Delivery.BackoffPolicy != nil { + if channel.Spec.Delivery.BackoffDelay != nil || channel.Spec.Delivery.Retry != nil || channel.Spec.Delivery.BackoffPolicy != nil || channel.Spec.Delivery.Timeout != nil { if delivery == nil { delivery = &eventingduckv1.DeliverySpec{} } delivery.BackoffPolicy = channel.Spec.Delivery.BackoffPolicy delivery.Retry = channel.Spec.Delivery.Retry delivery.BackoffDelay = channel.Spec.Delivery.BackoffDelay + delivery.Timeout = channel.Spec.Delivery.Timeout } return } @@ -530,13 +531,14 @@ func deliverySpec(sub *v1.Subscription, channel *eventingduckv1.Channelable) (de }, } } - if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil) { + if sub.Spec.Delivery != nil && (sub.Spec.Delivery.BackoffDelay != nil || sub.Spec.Delivery.Retry != nil || sub.Spec.Delivery.BackoffPolicy != nil || sub.Spec.Delivery.Timeout != nil) { if delivery == nil { delivery = &eventingduckv1.DeliverySpec{} } delivery.BackoffPolicy = sub.Spec.Delivery.BackoffPolicy delivery.Retry = sub.Spec.Delivery.Retry delivery.BackoffDelay = sub.Spec.Delivery.BackoffDelay + delivery.Timeout = sub.Spec.Delivery.Timeout } return } diff --git a/test/experimental/features/delivery_timeout/channel.go b/test/experimental/features/delivery_timeout/channel.go index 4d50190f462..ed8903ff049 100644 --- a/test/experimental/features/delivery_timeout/channel.go +++ b/test/experimental/features/delivery_timeout/channel.go @@ -21,6 +21,7 @@ import ( "time" cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/rickb777/date/period" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" @@ -39,8 +40,10 @@ import ( func ChannelToSink() *feature.Feature { f := feature.NewFeature() - timeout := 5 * time.Second - timeoutString := "PT5S" + timeout := 6 * time.Second + // Clocks are funny, let's add 1 second to take in account eventual clock skews + timeoutPeriod, _ := period.NewOf(timeout - time.Second) + timeoutString := timeoutPeriod.String() channelAPIVersion, imcKind := channel.GVK().ToAPIVersionAndKind() @@ -81,17 +84,17 @@ func ChannelToSink() *feature.Feature { }, Subscriber: &duckv1.Destination{ Ref: &duckv1.KReference{ - APIVersion: channelAPIVersion, - Kind: imcKind, + APIVersion: "v1", + Kind: "Service", Name: sinkName, }, }, Delivery: &eventingduckv1.DeliverySpec{ DeadLetterSink: &duckv1.Destination{ Ref: &duckv1.KReference{ - APIVersion: channelAPIVersion, - Kind: imcKind, - Name: channelName, + APIVersion: "v1", + Kind: "Service", + Name: deadLetterSinkName, }, }, Timeout: &timeoutString, From 665502053570a0aef8f39700411b46550e8d7dc5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 23 Jun 2021 09:45:00 +0200 Subject: [PATCH 4/5] Fix flaky test Signed-off-by: Francesco Guardiani --- pkg/kncloudevents/message_sender_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index d7275933ed7..38e3ee4f04b 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -166,7 +166,7 @@ func TestHTTPMessageSenderSendWithRetriesWithSingleRequestTimeout(t *testing.T) writer.WriteHeader(http.StatusOK) } else { // Let's add one more second - time.Sleep(timeout) + time.Sleep(timeout + 1) writer.WriteHeader(http.StatusAccepted) } })) From 36a5b0178e9aa5d112bb5941673e7a186da7dd73 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 23 Jun 2021 17:26:24 +0200 Subject: [PATCH 5/5] fix fix fix Signed-off-by: Francesco Guardiani --- pkg/kncloudevents/message_sender_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index 38e3ee4f04b..60d572c28cf 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -165,8 +165,8 @@ func TestHTTPMessageSenderSendWithRetriesWithSingleRequestTimeout(t *testing.T) if newVal >= 5 { writer.WriteHeader(http.StatusOK) } else { - // Let's add one more second - time.Sleep(timeout + 1) + // Let's add a bit more time + time.Sleep(timeout + (200 * time.Millisecond)) writer.WriteHeader(http.StatusAccepted) } }))