diff --git a/go.mod b/go.mod index fbd25ac0..73b930b3 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,15 @@ module github.com/elastic/apm-queue/v2 -go 1.23.6 +go 1.23.8 require ( github.com/aws/aws-sdk-go-v2/config v1.29.17 github.com/google/go-cmp v0.7.0 github.com/stretchr/testify v1.10.0 - github.com/twmb/franz-go v1.17.0 - github.com/twmb/franz-go/pkg/kadm v1.11.0 - github.com/twmb/franz-go/pkg/kfake v0.0.0-20240122000745-a2d69ce07790 - github.com/twmb/franz-go/pkg/kmsg v1.8.0 + github.com/twmb/franz-go v1.19.5 + github.com/twmb/franz-go/pkg/kadm v1.15.0 + github.com/twmb/franz-go/pkg/kfake v0.0.0-20250625174842-669b18eeee83 + github.com/twmb/franz-go/pkg/kmsg v1.11.2 github.com/twmb/franz-go/plugin/kzap v1.1.2 go.opentelemetry.io/otel v1.37.0 go.opentelemetry.io/otel/metric v1.37.0 @@ -38,11 +38,11 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/crypto v0.26.0 // indirect + golang.org/x/crypto v0.39.0 // indirect golang.org/x/sys v0.33.0 // indirect ) diff --git a/go.sum b/go.sum index 646a4700..436843bd 100644 --- a/go.sum +++ b/go.sum @@ -35,28 +35,28 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= -github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= -github.com/twmb/franz-go/pkg/kadm v1.11.0 h1:FfeWJ0qadntFpAcQt8JzNXW4dijjytZNLrzJuzzzuxA= -github.com/twmb/franz-go/pkg/kadm v1.11.0/go.mod h1:qrhkdH+SWS3ivmbqOgHbpgVHamhaKcjH0UM+uOp0M1A= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20240122000745-a2d69ce07790 h1:i5WT+XWoEUOEIZvtv4C99pr+TMGM7TcI95K/ivY303Y= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20240122000745-a2d69ce07790/go.mod h1:DCMFat7WCZfk946rqd9aVAcAmB6/rIcdMTslJSjJZgk= -github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= +github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= +github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= +github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20250625174842-669b18eeee83 h1:JJHzPprdI2KC4Fz1D/HpIvn3mlzU6v0KaHLCn+V5ILo= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20250625174842-669b18eeee83/go.mod h1:udxwmMC3r4xqjwrSrMi8p9jpqMDNpC2YwexpDSUmQtw= +github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= +github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE= github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -77,8 +77,8 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 1dc613cf..f1659190 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -329,13 +329,13 @@ func TestConsumerDelivery(t *testing.T) { Processor: newProcessor(nil, failRecord, cancel), } - record := &kgo.Record{ + r1 := &kgo.Record{ Topic: "name_space-topic", Value: []byte("content"), } for i := 0; i < int(tc.initialRecords); i++ { - produceRecord(ctx, t, client, record) + produceRecord(ctx, t, client, r1) } // expect up to tc.maxPollRecords @@ -383,9 +383,15 @@ func TestConsumerDelivery(t *testing.T) { // Start a new consumer in the background and then produce ctx, cancel = context.WithCancel(context.Background()) defer cancel() + + r2 := &kgo.Record{ + Topic: "name_space-topic", + Value: []byte("content"), + } + // Produce tc.lastRecords. for i := 0; i < tc.lastRecords; i++ { - produceRecord(ctx, t, client, record) + produceRecord(ctx, t, client, r2) } cfg.MaxPollRecords = tc.lastRecords cfg.Logger = baseLogger.Named("2") diff --git a/kafka/logger_test.go b/kafka/logger_test.go index 89151a1e..6f70bb6b 100644 --- a/kafka/logger_test.go +++ b/kafka/logger_test.go @@ -20,6 +20,7 @@ package kafka import ( "context" "errors" + "fmt" "net" "testing" "time" @@ -44,6 +45,7 @@ func TestHookLogsFailedDial(t *testing.T) { "expected one or two log lines, got %#v", observedLogs, ) // The error message should contain the error message from the dialer. + expectedErr = fmt.Sprintf("unable to dial: %s", expectedErr) assert.EqualValues(t, observedLogs[0].ContextMap()["error"], expectedErr) assert.Contains(t, observedLogs[0].ContextMap(), "event.duration") assert.Equal(t, observedLogs[0].Level, expectedLevel) diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 48f33ada..6249cbe2 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -387,6 +387,7 @@ func TestManagerMetrics(t *testing.T) { var listOffsetsRequest *kmsg.ListOffsetsRequest cluster.ControlKey(kmsg.ListOffsets.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() listOffsetsRequest = req.(*kmsg.ListOffsetsRequest) return &kmsg.ListOffsetsResponse{ Version: listOffsetsRequest.Version, @@ -490,7 +491,7 @@ func TestManagerMetrics(t *testing.T) { attribute.Int("partition", 4), attribute.Bool("foo", true), ), - Value: 4, // end offset = 4, nothing committed + Value: 0, // end offset = 4, nothing committed }, { Attributes: attribute.NewSet( attribute.String("group", "consumer3"), @@ -546,13 +547,14 @@ func TestManagerMetrics(t *testing.T) { }}, }, assignmentMetric.Data, metricdatatest.IgnoreTimestamp()) - assert.Equal(t, int16(5), describeGroupsRequest.Version) + assert.Equal(t, int16(6), describeGroupsRequest.Version) assert.ElementsMatch(t, []string{"connect", "consumer1", "consumer2", "consumer3"}, describeGroupsRequest.Groups) assert.ElementsMatch(t, []kmsg.OffsetFetchRequestGroup{ {Group: "connect", MemberEpoch: -1}, {Group: "consumer1", MemberEpoch: -1}, {Group: "consumer2", MemberEpoch: -1}, {Group: "consumer3", MemberEpoch: -1}, + {Group: "consumer4", MemberEpoch: -1}, }, offsetFetchRequest.Groups) assert.ElementsMatch(t, []kmsg.MetadataRequestTopic{ {Topic: kmsg.StringPtr("name_space-topic1")}, diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index f878fa84..cbb8e2ab 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -56,7 +56,7 @@ func TestProducerMetrics(t *testing.T) { ) // Fixes https://github.com/elastic/apm-queue/issues/464 - <-time.After(time.Millisecond) + <-time.After(10 * time.Millisecond) // Close the producer so records are flushed. require.NoError(t, producer.Close())