diff --git a/config/channels/in-memory-channel/resources/in-memory-channel.yaml b/config/channels/in-memory-channel/resources/in-memory-channel.yaml index 8c6854eebe9..452c1a90d75 100644 --- a/config/channels/in-memory-channel/resources/in-memory-channel.yaml +++ b/config/channels/in-memory-channel/resources/in-memory-channel.yaml @@ -74,6 +74,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout subscribers: description: This is the list of subscriptions for this subscribable. type: array diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index 6856d9b3351..8a86755bfd0 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -25,3 +25,7 @@ data: # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. # For more details: https://github.com/knative/eventing/issues/5086 kreference-group: "disabled" + + # ALPHA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec. + # For more details: https://github.com/knative/eventing/issues/5148 + delivery-timeout: "disabled" diff --git a/config/core/resources/broker.yaml b/config/core/resources/broker.yaml index a4014de5700..ccbd8095e56 100644 --- a/config/core/resources/broker.yaml +++ b/config/core/resources/broker.yaml @@ -90,6 +90,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout status: description: Status represents the current state of the Broker. This data may be out of date. type: object diff --git a/config/core/resources/channel.yaml b/config/core/resources/channel.yaml index 33f5b8b1ff7..25b2e18dd78 100644 --- a/config/core/resources/channel.yaml +++ b/config/core/resources/channel.yaml @@ -102,6 +102,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout subscribers: description: This is the list of subscriptions for this subscribable. type: array diff --git a/config/core/resources/parallel.yaml b/config/core/resources/parallel.yaml index 93f64c864e8..74cc15aee5f 100644 --- a/config/core/resources/parallel.yaml +++ b/config/core/resources/parallel.yaml @@ -104,6 +104,7 @@ spec: sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout filter: description: Filter is the expression guarding the branch type: object diff --git a/config/core/resources/sequence.yaml b/config/core/resources/sequence.yaml index b8953d6bfcd..bf3fe3ba7a6 100644 --- a/config/core/resources/sequence.yaml +++ b/config/core/resources/sequence.yaml @@ -115,6 +115,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout ref: description: Ref points to an Addressable. type: object diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index ce90004a2b1..53ac8931962 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -84,6 +84,7 @@ spec: description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. type: integer format: int32 + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature delivery-timeout reply: description: Reply specifies (optionally) how to handle events returned from the Subscriber target. type: object diff --git a/docs/eventing-api.md b/docs/eventing-api.md index b92a8afc7ee..3bbb760fad5 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -325,6 +325,22 @@ sending an event before moving it to the dead letter sink.

+timeout
+ +string + + + +(Optional) +

Timeout is the timeout of each single request. The value must be greater than 0. +More information on Duration format: +- https://www.iso.org/iso-8601-date-and-time-format.html +- https://en.wikipedia.org/wiki/ISO_8601

+

Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5148

+ + + + backoffPolicy
@@ -977,6 +993,20 @@ sending an event before moving it to the dead letter sink.

+timeout
+ +string + + + +

Timeout is the timeout of each single request. +More information on Duration format: +- https://www.iso.org/iso-8601-date-and-time-format.html +- https://en.wikipedia.org/wiki/ISO_8601

+ + + + backoffPolicy
diff --git a/pkg/apis/duck/v1/delivery_types.go b/pkg/apis/duck/v1/delivery_types.go index 0577bd584fe..b82bbc4f9b7 100644 --- a/pkg/apis/duck/v1/delivery_types.go +++ b/pkg/apis/duck/v1/delivery_types.go @@ -20,6 +20,7 @@ import ( "context" "github.com/rickb777/date/period" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -37,6 +38,15 @@ type DeliverySpec struct { // +optional Retry *int32 `json:"retry,omitempty"` + // Timeout is the timeout of each single request. The value must be greater than 0. + // More information on Duration format: + // - https://www.iso.org/iso-8601-date-and-time-format.html + // - https://en.wikipedia.org/wiki/ISO_8601 + // + // Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5148 + // +optional + Timeout *string `json:"timeout,omitempty"` + // BackoffPolicy is the retry backoff policy (linear, exponential). // +optional BackoffPolicy *BackoffPolicyType `json:"backoffPolicy,omitempty"` @@ -65,6 +75,17 @@ func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError { errs = errs.Also(apis.ErrInvalidValue(*ds.Retry, "retry")) } + if ds.Timeout != nil { + if feature.FromContext(ctx).IsEnabled(feature.DeliveryTimeout) { + t, te := period.Parse(*ds.Timeout) + if te != nil || t.IsZero() { + errs = errs.Also(apis.ErrInvalidValue(*ds.Timeout, "timeout")) + } + } else { + errs = errs.Also(apis.ErrDisallowedFields("timeout")) + } + } + if ds.BackoffPolicy != nil { switch *ds.BackoffPolicy { case BackoffPolicyExponential, BackoffPolicyLinear: diff --git a/pkg/apis/duck/v1/delivery_types_test.go b/pkg/apis/duck/v1/delivery_types_test.go index 5238c1033fd..ca82ed2b15b 100644 --- a/pkg/apis/duck/v1/delivery_types_test.go +++ b/pkg/apis/duck/v1/delivery_types_test.go @@ -21,18 +21,24 @@ import ( "github.com/google/go-cmp/cmp" "k8s.io/utils/pointer" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) func TestDeliverySpecValidation(t *testing.T) { + deliveryTimeoutEnabledCtx := feature.ToContext(context.TODO(), feature.Flags{ + feature.DeliveryTimeout: feature.Enabled, + }) + invalidString := "invalid time" bop := BackoffPolicyExponential - validBackoffDelay := "PT2S" - invalidBackoffDelay := "1985-04-12T23:20:50.52Z" + validDuration := "PT2S" + invalidDuration := "1985-04-12T23:20:50.52Z" tests := []struct { name string spec *DeliverySpec + ctx context.Context want *apis.FieldError }{{ name: "nil is valid", @@ -50,25 +56,48 @@ func TestDeliverySpecValidation(t *testing.T) { want: func() *apis.FieldError { return apis.ErrGeneric("expected at least one, got none", "ref", "uri").ViaField("deadLetterSink") }(), + }, { + name: "valid timeout", + spec: &DeliverySpec{Timeout: &validDuration}, + ctx: deliveryTimeoutEnabledCtx, + want: nil, + }, { + name: "invalid timeout", + spec: &DeliverySpec{Timeout: &invalidDuration}, + ctx: deliveryTimeoutEnabledCtx, + want: func() *apis.FieldError { + return apis.ErrInvalidValue(invalidDuration, "timeout") + }(), + }, { + name: "zero timeout", + spec: &DeliverySpec{Timeout: pointer.StringPtr("PT0S")}, + ctx: deliveryTimeoutEnabledCtx, + want: func() *apis.FieldError { + return apis.ErrInvalidValue("PT0S", "timeout") + }(), + }, { + name: "disabled timeout", + spec: &DeliverySpec{Timeout: &validDuration}, + want: apis.ErrDisallowedFields("timeout"), }, { name: "valid backoffPolicy", spec: &DeliverySpec{BackoffPolicy: &bop}, want: nil, }, { name: "valid backoffDelay", - spec: &DeliverySpec{BackoffDelay: &validBackoffDelay}, + spec: &DeliverySpec{BackoffDelay: &validDuration}, want: nil, }, { name: "invalid backoffDelay", - spec: &DeliverySpec{BackoffDelay: &invalidBackoffDelay}, + spec: &DeliverySpec{BackoffDelay: &invalidDuration}, want: func() *apis.FieldError { - return apis.ErrGeneric("invalid value: "+invalidBackoffDelay, "backoffDelay") + return apis.ErrInvalidValue(invalidDuration, "backoffDelay") }(), }, { name: "negative retry", spec: &DeliverySpec{Retry: pointer.Int32Ptr(-1)}, want: func() *apis.FieldError { - return apis.ErrGeneric("invalid value: -1", "retry") + return apis.ErrInvalidValue("-1", "retry") }(), }, { name: "valid retry 0", @@ -80,7 +109,11 @@ func TestDeliverySpecValidation(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got := test.spec.Validate(context.TODO()) + ctx := test.ctx + if ctx == nil { + ctx = context.TODO() + } + got := test.spec.Validate(ctx) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { t.Error("DeliverySpec.Validate (-want, +got) =", diff) } diff --git a/pkg/apis/duck/v1/zz_generated.deepcopy.go b/pkg/apis/duck/v1/zz_generated.deepcopy.go index 902fb4b1f23..0501d0e6045 100644 --- a/pkg/apis/duck/v1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1/zz_generated.deepcopy.go @@ -146,6 +146,11 @@ func (in *DeliverySpec) DeepCopyInto(out *DeliverySpec) { *out = new(int32) **out = **in } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(string) + **out = **in + } if in.BackoffPolicy != nil { in, out := &in.BackoffPolicy, &out.BackoffPolicy *out = new(BackoffPolicyType) diff --git a/pkg/apis/duck/v1beta1/delivery_conversion.go b/pkg/apis/duck/v1beta1/delivery_conversion.go index d8a3bc880a2..d48ccde0c18 100644 --- a/pkg/apis/duck/v1beta1/delivery_conversion.go +++ b/pkg/apis/duck/v1beta1/delivery_conversion.go @@ -31,6 +31,7 @@ func (source *DeliverySpec) ConvertTo(ctx context.Context, to apis.Convertible) case *eventingduckv1.DeliverySpec: sink.Retry = source.Retry sink.BackoffDelay = source.BackoffDelay + sink.Timeout = source.Timeout if source.BackoffPolicy != nil { if *source.BackoffPolicy == BackoffPolicyLinear { linear := eventingduckv1.BackoffPolicyLinear @@ -55,6 +56,7 @@ func (sink *DeliverySpec) ConvertFrom(ctx context.Context, from apis.Convertible case *eventingduckv1.DeliverySpec: sink.Retry = source.Retry sink.BackoffDelay = source.BackoffDelay + sink.Timeout = source.Timeout if source.BackoffPolicy != nil { if *source.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { linear := BackoffPolicyLinear diff --git a/pkg/apis/duck/v1beta1/delivery_types.go b/pkg/apis/duck/v1beta1/delivery_types.go index ac2001704ef..33ebd3ae2dc 100644 --- a/pkg/apis/duck/v1beta1/delivery_types.go +++ b/pkg/apis/duck/v1beta1/delivery_types.go @@ -38,6 +38,13 @@ type DeliverySpec struct { // +optional Retry *int32 `json:"retry,omitempty"` + // Timeout is the timeout of each single request. + // More information on Duration format: + // - https://www.iso.org/iso-8601-date-and-time-format.html + // - https://en.wikipedia.org/wiki/ISO_8601 + // + Timeout *string `json:"timeout,omitempty"` + // BackoffPolicy is the retry backoff policy (linear, exponential). // +optional BackoffPolicy *BackoffPolicyType `json:"backoffPolicy,omitempty"` @@ -66,6 +73,13 @@ func (ds *DeliverySpec) Validate(ctx context.Context) *apis.FieldError { errs = errs.Also(apis.ErrInvalidValue(*ds.Retry, "retry")) } + if ds.Timeout != nil { + _, te := period.Parse(*ds.Timeout) + if te != nil { + errs = errs.Also(apis.ErrInvalidValue(*ds.Timeout, "timeout")) + } + } + if ds.BackoffPolicy != nil { switch *ds.BackoffPolicy { case BackoffPolicyExponential, BackoffPolicyLinear: diff --git a/pkg/apis/duck/v1beta1/delivery_types_test.go b/pkg/apis/duck/v1beta1/delivery_types_test.go index ff5de5a494a..f1936554949 100644 --- a/pkg/apis/duck/v1beta1/delivery_types_test.go +++ b/pkg/apis/duck/v1beta1/delivery_types_test.go @@ -28,8 +28,8 @@ import ( func TestDeliverySpecValidation(t *testing.T) { invalidString := "invalid time" bop := BackoffPolicyExponential - validBackoffDelay := "PT2S" - invalidBackoffDelay := "1985-04-12T23:20:50.52Z" + validDuration := "PT2S" + invalidDuration := "1985-04-12T23:20:50.52Z" tests := []struct { name string spec *DeliverySpec @@ -50,25 +50,35 @@ func TestDeliverySpecValidation(t *testing.T) { want: func() *apis.FieldError { return apis.ErrGeneric("expected at least one, got none", "ref", "uri").ViaField("deadLetterSink") }(), + }, { + name: "valid timeout", + spec: &DeliverySpec{Timeout: &validDuration}, + want: nil, + }, { + name: "invalid timeout", + spec: &DeliverySpec{Timeout: &invalidDuration}, + want: func() *apis.FieldError { + return apis.ErrInvalidValue(invalidDuration, "timeout") + }(), }, { name: "valid backoffPolicy", spec: &DeliverySpec{BackoffPolicy: &bop}, want: nil, }, { name: "valid backoffDelay", - spec: &DeliverySpec{BackoffDelay: &validBackoffDelay}, + spec: &DeliverySpec{BackoffDelay: &validDuration}, want: nil, }, { name: "invalid backoffDelay", - spec: &DeliverySpec{BackoffDelay: &invalidBackoffDelay}, + spec: &DeliverySpec{BackoffDelay: &invalidDuration}, want: func() *apis.FieldError { - return apis.ErrGeneric("invalid value: "+invalidBackoffDelay, "backoffDelay") + return apis.ErrInvalidValue(invalidDuration, "backoffDelay") }(), }, { name: "negative retry", spec: &DeliverySpec{Retry: pointer.Int32Ptr(-1)}, want: func() *apis.FieldError { - return apis.ErrGeneric("invalid value: -1", "retry") + return apis.ErrInvalidValue("-1", "retry") }(), }, { name: "valid retry 0", diff --git a/pkg/apis/duck/v1beta1/zz_generated.deepcopy.go b/pkg/apis/duck/v1beta1/zz_generated.deepcopy.go index c68ae747165..175fbad9895 100644 --- a/pkg/apis/duck/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1beta1/zz_generated.deepcopy.go @@ -146,6 +146,11 @@ func (in *DeliverySpec) DeepCopyInto(out *DeliverySpec) { *out = new(int32) **out = **in } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(string) + **out = **in + } if in.BackoffPolicy != nil { in, out := &in.BackoffPolicy, &out.BackoffPolicy *out = new(BackoffPolicyType) diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index fdc49d7e144..05031685863 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -18,4 +18,5 @@ package feature const ( KReferenceGroup = "kreference-group" + DeliveryTimeout = "delivery-timeout" )