-
Notifications
You must be signed in to change notification settings - Fork 24
Logs refactoring using Go iterators #1845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
6a489d6
feat(aws): query ALB logs
lionello 212c710
wip
lionello d4bfdcb
new logging
lionello 32f1785
Update src/pkg/cli/client/byoc/do/stream.go
lionello 25846bf
code rabbit comments
lionello 2b08e1b
Merge remote-tracking branch 'origin/main' into lio/query-alb-logs
lionello File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,236 @@ | ||
| package aws | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "compress/gzip" | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "iter" | ||
| "slices" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/DefangLabs/defang/src/pkg/clouds/aws/cw" | ||
| "github.com/DefangLabs/defang/src/pkg/term" | ||
| "github.com/aws/aws-sdk-go-v2/service/s3" | ||
| s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" | ||
| ) | ||
|
|
||
| func (b *ByocAws) fetchAndStreamAlbLogs(ctx context.Context, projectName string, since, end time.Time, pattern string) (iter.Seq2[cw.LogEvent, error], error) { | ||
| cfg, err := b.driver.LoadConfig(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| s3Client := s3.NewFromConfig(cfg) | ||
| bucketsOutput, err := s3Client.ListBuckets(ctx, &s3.ListBucketsInput{}) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| bucketPrefix := fmt.Sprintf("%s-%s-alb-logs", projectName, b.PulumiStack) | ||
| if b.Prefix != "" { | ||
| bucketPrefix = b.Prefix + "-" + bucketPrefix | ||
| } | ||
| term.Debug("Query ALB logs", bucketPrefix) | ||
| if len(bucketPrefix) > 31 { | ||
| // HACK: AWS CD truncates the ALB name to 31 characters (because of the long Terraform suffix) | ||
| bucketPrefix = bucketPrefix[:31] | ||
| } | ||
| bucketPrefix = strings.ToLower(bucketPrefix) | ||
|
|
||
| // First, find bucket with the given prefix for the project/stack | ||
| var bucketName string | ||
| for _, bucket := range bucketsOutput.Buckets { | ||
| if strings.HasPrefix(*bucket.Name, bucketPrefix) { | ||
| // TODO: inspect the bucket tags to ensure it belongs to the right org/project/stack | ||
| bucketName = *bucket.Name | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if bucketName == "" { | ||
| return nil, fmt.Errorf("no bucket found with prefix %q", bucketPrefix) | ||
| } | ||
|
|
||
| return func(yield func(cw.LogEvent, error) bool) { | ||
| for logs, err := range b.fetchAndStreamAlbLogsFromBucket(ctx, bucketName, since, end, s3Client, pattern) { | ||
| if err != nil { | ||
| yield(cw.LogEvent{}, err) | ||
| return | ||
| } | ||
| for _, log := range logs { | ||
| timestamp := log.Timestamp.UnixMilli() // FIXME: this destroys the original timestamp precision | ||
| if !yield(cw.LogEvent{ | ||
| Message: &log.Message, | ||
| Timestamp: ×tamp, | ||
| }, nil) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
| }, nil | ||
| } | ||
|
|
||
| type s3Lister interface { | ||
| s3.ListObjectsV2APIClient | ||
| GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) | ||
| } | ||
|
|
||
| func getAlbLogObjectGroupKey(objName string) string { | ||
| // 123456789012_elasticloadbalancing_us-test-2_app.defang-project-stack-alb.d850f5ca299e222a_20260207T0120Z_11.22.33.44_2khrazuh.log.gz | ||
| key, _, _ := strings.Cut(objName, "Z_") | ||
| return key | ||
| } | ||
|
|
||
| func (b *ByocAws) fetchAndStreamAlbLogsFromBucket(ctx context.Context, bucketName string, since, end time.Time, s3Client s3Lister, pattern string) iter.Seq2[[]ALBLogEntry, error] { | ||
| return func(yield func([]ALBLogEntry, error) bool) { | ||
| if end.IsZero() { | ||
| end = time.Now() | ||
| } | ||
| if since.IsZero() { | ||
| since = end.Add(-60 * time.Minute) | ||
| } | ||
| // If the end time is 00:00:01Z, we should still consider log files modified at 00:05:03Z | ||
| // because each file has ~5 minutes of logs and writing the file will have take a few seconds. | ||
| lastModifiedEnd := end.Add(5*time.Minute + 5*time.Second) | ||
|
|
||
| // Use a single listing with the region-level prefix instead of iterating day-by-day. | ||
| // StartAfter skips to the start date, so empty buckets complete in a single API call. | ||
| objectPrefix := fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/", b.driver.AccountID, b.driver.Region) | ||
| year, month, day := since.UTC().Date() | ||
| startAfter := fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%04d/%02d/%02d/", b.driver.AccountID, b.driver.Region, year, month, day) | ||
|
|
||
| listInput := s3.ListObjectsV2Input{ | ||
| Bucket: &bucketName, | ||
| Prefix: &objectPrefix, | ||
| StartAfter: &startAfter, | ||
| } | ||
| var groupKey string | ||
| var group []s3types.Object | ||
| done: | ||
| for { | ||
| list, err := s3Client.ListObjectsV2(ctx, &listInput) | ||
| if err != nil { | ||
| yield(nil, err) | ||
| return | ||
| } | ||
| for _, obj := range list.Contents { | ||
| // LastModified is time of latest record. Skip objects with events older than the since-time | ||
| if obj.LastModified.Before(since) { | ||
| continue | ||
| } | ||
| // Check end-time, but consider that each object has ~5 minutes of logs | ||
| if obj.LastModified.After(lastModifiedEnd) { | ||
| break done | ||
| } | ||
| if key := getAlbLogObjectGroupKey(*obj.Key); key == groupKey { | ||
| // Same timespan as the previous object, so add to group for merging. | ||
| group = append(group, obj) | ||
| } else { | ||
| // New timespan, so stream logs from the previous group(s) before starting a new group. | ||
| logs, err := readAlbLogsGroup(ctx, bucketName, group, since, end, s3Client, pattern) | ||
| if len(logs) > 0 || err != nil { | ||
| if !yield(logs, err) { | ||
| return | ||
| } | ||
| } | ||
| group = []s3types.Object{obj} | ||
| groupKey = key | ||
| } | ||
| } | ||
| if list.NextContinuationToken == nil { | ||
| break | ||
| } | ||
| listInput.ContinuationToken = list.NextContinuationToken | ||
| } | ||
| // Flush remaining group | ||
| logs, err := readAlbLogsGroup(ctx, bucketName, group, since, end, s3Client, pattern) | ||
| if len(logs) > 0 || err != nil { | ||
| yield(logs, err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| type ALBLogEntry struct { | ||
| Message string | ||
| Timestamp time.Time | ||
| } | ||
|
|
||
| func readAlbLogsGroup(ctx context.Context, bucketName string, group []s3types.Object, since, end time.Time, s3Client s3Lister, pattern string) ([]ALBLogEntry, error) { | ||
| var allEntries []ALBLogEntry | ||
| for _, obj := range group { | ||
| content, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ | ||
| Bucket: &bucketName, | ||
| Key: obj.Key, | ||
| }) | ||
| if err != nil { | ||
| return nil, err // or continue with other objects? | ||
| } | ||
| entries, err := readAlbLogs(content.Body, since, end, pattern) | ||
| if err != nil { | ||
| return nil, err // or continue with other objects? | ||
| } | ||
| if allEntries == nil { | ||
| allEntries = entries | ||
| } else { | ||
| allEntries = append(allEntries, entries...) | ||
| } | ||
| } | ||
| // Always need to sort, because log entries within each object are not in order. | ||
| slices.SortFunc(allEntries, func(a, b ALBLogEntry) int { | ||
| return a.Timestamp.Compare(b.Timestamp) | ||
| }) | ||
| return allEntries, nil | ||
| } | ||
|
|
||
| var errMalformedALBLogLine = errors.New("malformed ALB log line") | ||
|
|
||
| func parseAlbLogTime(logLine string) (time.Time, error) { | ||
| // https 2026-02-05T23:58:32.578204Z app/defang-project-stack7d0286/c9b3756e8ef89456 11.22.33.44:34025 - -1 -1 -1 404 - 842 1023 "POST https://11.22.33.44:443/ HTTP/1.1" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.203" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 - "Root=1-69852ea8-7429674e211c223e3c211c6d" "-" "arn:aws:acm:us-test-2:123456789012:certificate/be524858-3414-4e98-be52-240358d85b1c" 0 2026-02-05T23:58:32.493000Z "fixed-response" "-" "-" "-" "-" "-" "-" TID_ba88f3bfb4f5c249b7d9f74348a70697 "-" "-" "-" | ||
| timestampStart := strings.IndexByte(logLine, ' ') + 1 // will be 0 if not found | ||
| timestampEnd := strings.IndexByte(logLine[timestampStart:], ' ') + timestampStart | ||
| if timestampEnd <= timestampStart { | ||
| return time.Time{}, errMalformedALBLogLine | ||
| } | ||
| return time.Parse(time.RFC3339Nano, logLine[timestampStart:timestampEnd]) | ||
| } | ||
|
|
||
| func readAlbLogs(body io.ReadCloser, since, end time.Time, pattern string) ([]ALBLogEntry, error) { | ||
| defer body.Close() | ||
| gzipReader, err := gzip.NewReader(body) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| var entries []ALBLogEntry | ||
| lineScanner := bufio.NewScanner(gzipReader) | ||
| for lineScanner.Scan() { | ||
| logLine := lineScanner.Text() | ||
| if !strings.Contains(logLine, pattern) { | ||
| continue | ||
| } | ||
| timestamp, err := parseAlbLogTime(logLine) | ||
| if err != nil { | ||
| continue // malformed timestamp: ignore | ||
| } | ||
| if timestamp.Before(since) { | ||
| continue | ||
| } | ||
| if timestamp.After(end) { | ||
| continue // can't break, because there can be out-of-order timestamps | ||
| } | ||
| entries = append(entries, ALBLogEntry{ | ||
| Message: logLine, | ||
| Timestamp: timestamp, | ||
| }) | ||
| } | ||
| if err := lineScanner.Err(); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := gzipReader.Close(); err != nil { | ||
| return nil, err // only returns err on failed checksum after io.EOF | ||
| } | ||
| return entries, nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| package aws | ||
|
|
||
| import ( | ||
| "context" | ||
| "os" | ||
| "path/filepath" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go-v2/service/s3" | ||
| s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" | ||
| "github.com/aws/smithy-go/ptr" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func Test_readAlbLogs(t *testing.T) { | ||
| gz, err := os.Open("testdata/123456789012_elasticloadbalancing_us-west-2_app.defang-agentic-strands-aws7d0286.c9b3756e8ef89456_20260206T0000Z_44.233.47.227_7tj887d8.log.gz") | ||
| require.NoError(t, err) | ||
| entries, err := readAlbLogs(gz, time.Time{}, time.Now(), "") | ||
| require.NoError(t, err) | ||
| for _, entry := range entries { | ||
| t.Logf("%s: %s", entry.Timestamp, entry.Message) | ||
| } | ||
| } | ||
|
|
||
| type mockS3Lister struct{} | ||
|
|
||
| func (m mockS3Lister) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { | ||
| entries, err := os.ReadDir(filepath.Join(".", *params.Bucket)) | ||
| contents := make([]s3types.Object, len(entries)) | ||
| for i, entry := range entries { | ||
| contents[i].Key = ptr.String(entry.Name()) | ||
| contents[i].LastModified = ptr.Time(time.Now()) | ||
| } | ||
| return &s3.ListObjectsV2Output{ | ||
| Contents: contents, | ||
| }, err | ||
| } | ||
|
|
||
| func (m mockS3Lister) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { | ||
| body, err := os.Open(*params.Key) | ||
| return &s3.GetObjectOutput{ | ||
| Body: body, | ||
| }, err | ||
| } | ||
|
|
||
| func Test_streamAlbLogGroup(t *testing.T) { | ||
| s3Client := mockS3Lister{} | ||
|
|
||
| t.Run("empty group", func(t *testing.T) { | ||
| entries, err := readAlbLogsGroup(t.Context(), "testdata", nil, time.Time{}, time.Now(), s3Client, "") | ||
| require.NoError(t, err) | ||
| require.Empty(t, entries) | ||
| }) | ||
|
|
||
| t.Run("with test files", func(t *testing.T) { | ||
| files, err := os.ReadDir("testdata") | ||
| require.NoError(t, err) | ||
| var objects []s3types.Object | ||
| for _, f := range files { | ||
| if filepath.Ext(f.Name()) == ".gz" { | ||
| objects = append(objects, s3types.Object{ | ||
| Key: ptr.String(filepath.Join("testdata", f.Name())), | ||
| LastModified: ptr.Time(time.Now()), | ||
| }) | ||
| } | ||
| } | ||
| entries, err := readAlbLogsGroup(t.Context(), "testdata", objects, time.Time{}, time.Now(), s3Client, "") | ||
| require.NoError(t, err) | ||
| for _, entry := range entries { | ||
| t.Logf("%s: %s", entry.Timestamp, entry.Message) | ||
| } | ||
| require.NotEmpty(t, entries) | ||
| // Verify entries are sorted by timestamp | ||
| for i := 1; i < len(entries); i++ { | ||
| require.False(t, entries[i].Timestamp.Before(entries[i-1].Timestamp), "entries not sorted at index %d", i) | ||
| } | ||
| }) | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.