@@ -2,15 +2,19 @@ package events
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "sort"
8+ "strconv"
79 "strings"
10+ "time"
811
912 "github.com/acorn-io/mink/pkg/strategy"
1013 "github.com/acorn-io/mink/pkg/types"
1114 apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1"
12- v1 "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1"
15+ internalv1 "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1"
1316 "github.com/acorn-io/runtime/pkg/channels"
17+ "github.com/acorn-io/z"
1418 "github.com/sirupsen/logrus"
1519 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1620 "k8s.io/apimachinery/pkg/watch"
@@ -83,7 +87,7 @@ func setDefaults(ctx context.Context, e *apiv1.Event) *apiv1.Event {
8387 }
8488
8589 if e .Observed .IsZero () {
86- e .Observed = v1 . MicroTime ( metav1 . NowMicro () )
90+ e .Observed = internalv1 . NowMicro ()
8791 }
8892
8993 return e
@@ -97,6 +101,12 @@ type query struct {
97101 // Only events with matching names or source strings are included in query results.
98102 // As a special case, the empty string "" matches all events.
99103 prefix prefix
104+
105+ // since excludes events observed before it when not nil.
106+ since * internalv1.MicroTime
107+
108+ // until excludes events observed after it when not nil.
109+ until * internalv1.MicroTime
100110}
101111
102112// filterChannel applies the query to every event received from unfiltered and forwards the result to filtered, if any.
@@ -131,7 +141,7 @@ func (q query) filterEvent(e watch.Event) *watch.Event {
131141 return & e
132142 }
133143
134- // Attempt to filter
144+ // Filter
135145 obj := e .Object .(* apiv1.Event )
136146 filtered := q .filter (* obj )
137147 if len (filtered ) < 1 {
@@ -144,6 +154,24 @@ func (q query) filterEvent(e watch.Event) *watch.Event {
144154 return & e
145155}
146156
157+ func (q query ) afterWindow (observation internalv1.MicroTime ) bool {
158+ if q .until == nil {
159+ // Window includes all future events
160+ return false
161+ }
162+
163+ return observation .After (q .until .Time )
164+ }
165+
166+ func (q query ) beforeWindow (observation internalv1.MicroTime ) bool {
167+ if q .since == nil {
168+ // Window includes all existing events
169+ return false
170+ }
171+
172+ return observation .Before (q .since .Time )
173+ }
174+
147175// filter returns the result of applying the query to a slice of events.
148176func (q query ) filter (events ... apiv1.Event ) []apiv1.Event {
149177 if len (events ) < 1 {
@@ -161,15 +189,19 @@ func (q query) filter(events ...apiv1.Event) []apiv1.Event {
161189 tail = int (q .tail )
162190 }
163191
164- if q .prefix .all () {
165- // Query selects all remaining events
166- return events [len (events )- tail :]
167- }
168-
169192 results := make ([]apiv1.Event , 0 , tail )
170193 for _ , event := range events {
171- if ! q .prefix .matches (event ) {
172- // Exclude from results
194+ observed := event .Observed
195+ if q .afterWindow (observed ) {
196+ // Exclude all events observed after the observation window ends.
197+ // Since the slice is sorted chronologically, we can stop filtering here.
198+ break
199+ }
200+
201+ if q .beforeWindow (observed ) || ! q .prefix .matches (event ) {
202+ // Exclude events:
203+ // - observed before the observation window starts
204+ // - that don't match the given prefix
173205 continue
174206 }
175207
@@ -187,13 +219,18 @@ func (q query) filter(events ...apiv1.Event) []apiv1.Event {
187219func stripQuery (opts storage.ListOptions ) (q query , stripped storage.ListOptions , err error ) {
188220 stripped = opts
189221
222+ now := internalv1 .NowMicro ()
190223 stripped .Predicate .Field , err = stripped .Predicate .Field .Transform (func (f , v string ) (string , string , error ) {
191224 var err error
192225 switch f {
193226 case "details" :
194227 // Detail elision is deprecated, so clients should always get details.
195228 // We still strip it from the selector here in order to maintain limited backwards compatibility with old
196229 // clients that still specify it.
230+ case "since" :
231+ q .since , err = parseTimeBound (v , now , true )
232+ case "until" :
233+ q .until , err = parseTimeBound (v , now , false )
197234 case "prefix" :
198235 q .prefix = prefix (v )
199236 default :
@@ -211,6 +248,76 @@ func stripQuery(opts storage.ListOptions) (q query, stripped storage.ListOptions
211248 return
212249}
213250
251+ // parseTimeBound parses a time bound from a string.
252+ //
253+ // It attempts to parse raw as one of the following formats, in order, returning the result of the first successful parse:
254+ // 1. Go duration; e.g. "5m"
255+ // - time is calculated relative to now
256+ // - if since is true, then the duration is subtracted from now, otherwise it is added
257+ //
258+ // 2. RFC3339; e.g. "2006-01-02T15:04:05Z07:00"
259+ // 3. RFC3339Micro; e.g. "2006-01-02T15:04:05.999999Z07:00"
260+ // 4. Unix timestamp; e.g. "1136239445"
261+ func parseTimeBound (raw string , now internalv1.MicroTime , since bool ) (* internalv1.MicroTime , error ) {
262+ // Try to parse raw as a duration string
263+ var errs []error
264+ duration , err := time .ParseDuration (raw )
265+ if err == nil {
266+ if since {
267+ duration *= - 1
268+ }
269+
270+ return z .P (internalv1 .NewMicroTime (now .Add (duration ))), nil
271+ }
272+ errs = append (errs , fmt .Errorf ("%s is not a valid duration: %w" , raw , err ))
273+
274+ // Try to parse raw as a time string
275+ t , err := parseTime (raw )
276+ if err == nil {
277+ return t , nil
278+ }
279+ errs = append (errs , fmt .Errorf ("%s is not a valid time: %w" , raw , err ))
280+
281+ // Try to parse raw as a unix timestamp
282+ unix , err := parseUnix (raw )
283+ if err == nil {
284+ return unix , nil
285+ }
286+ errs = append (errs , fmt .Errorf ("%s is not a valid unix timestamp: %w" , raw , err ))
287+
288+ return nil , errors .Join (errs ... )
289+ }
290+
291+ var (
292+ supportedLayouts = []string {
293+ time .RFC3339 ,
294+ metav1 .RFC3339Micro ,
295+ }
296+ )
297+
298+ func parseTime (raw string ) (* internalv1.MicroTime , error ) {
299+ var errs []error
300+ for _ , layout := range supportedLayouts {
301+ t , err := time .Parse (layout , raw )
302+ if err == nil {
303+ return z .P (internalv1 .NewMicroTime (t )), nil
304+ }
305+
306+ errs = append (errs , err )
307+ }
308+
309+ return nil , errors .Join (errs ... )
310+ }
311+
312+ func parseUnix (raw string ) (* internalv1.MicroTime , error ) {
313+ sec , err := strconv .ParseInt (raw , 10 , 64 )
314+ if err != nil {
315+ return nil , err
316+ }
317+
318+ return z .P (internalv1 .NewMicroTime (time .Unix (sec , 0 ))), nil
319+ }
320+
214321type prefix string
215322
216323func (p prefix ) matches (e apiv1.Event ) bool {
0 commit comments