-
Notifications
You must be signed in to change notification settings - Fork 824
Open
Labels
Description
Describe the bug
Panic when write batch mesage. Context is relaying message from rabbitmq to kafka
A clear and concise description of what the bug is.
panic: runtime error: slice bounds out of range [:675] with capacity 640
goroutine 2411 [running]:
github.com/segmentio/kafka-go/protocol.(*encoder).Write(0xc0003a8280, {0xc000ec2f00, 0x0?, 0x280})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:60 +0xfc
bytes.(*Reader).WriteTo(0xc00011c240, {0x7c3740?, 0xc0003a8280?})
/usr/local/go/src/bytes/reader.go:143 +0x7d
io.copyBuffer({0x7c3740, 0xc0003a8280}, {0x7f2ebddafc60, 0xc00011c240}, {0x0, 0x0, 0x0})
/usr/local/go/src/io/io.go:411 +0x9d
io.Copy(...)
/usr/local/go/src/io/io.go:388
github.com/segmentio/kafka-go/protocol.(*encoder).writeVarNullBytesFrom(0xc0003a8280, {0x7c4a88, 0xc00011c240})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:335 +0xc5
github.com/segmentio/kafka-go/protocol.(*RecordSet).writeToVersion2.func1(0x2, 0xc00011c1c0)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/record_v2.go:259 +0x393
github.com/segmentio/kafka-go/protocol.handleRecord(0xc00011c1a0?, 0xc0003a8280?, 0xc0000c7648?)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/record_batch.go:74 +0xc4
github.com/segmentio/kafka-go/protocol.forEachRecord({0x7c2960, 0xc00011c1a0}, 0xc000349748)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/record_batch.go:61 +0x5c
github.com/segmentio/kafka-go/protocol.(*RecordSet).writeToVersion2(0xc00012be88, 0xc00022e5d0, 0x34)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/record_v2.go:223 +0x406
github.com/segmentio/kafka-go/protocol.(*RecordSet).WriteTo(0xc00012be88, {0x7c3640, 0xc00022e5d0?})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/record.go:269 +0x125
github.com/segmentio/kafka-go/protocol.writerEncodeFuncOf.func1(0xc0003a8230, {{0x70d180?, 0xc00012be88?, 0xc00022c2b8?}})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:522 +0xc6
github.com/segmentio/kafka-go/protocol.structEncodeFuncOf.func2(0xc0003a8230, {{0x703380?, 0xc00012be80?, 0x20?}})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:474 +0xd6
github.com/segmentio/kafka-go/protocol.(*encoder).encodeArray(0xc0003a8230, {{0x6cf240?, 0xc00022e4f0?, 0x65d71b?}}, {0xc0003a8230?, 0xc000574020?}, 0xc000230020)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:175 +0xc4
github.com/segmentio/kafka-go/protocol.arrayEncodeFuncOf.func4(0x703420?, {{0x6cf240?, 0xc00022e4f0?, 0xc00022c2a8?}})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:509 +0x29
github.com/segmentio/kafka-go/protocol.structEncodeFuncOf.func2(0xc0003a8230, {{0x703420?, 0xc00022e4e0?, 0x20?}})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:474 +0xd6
github.com/segmentio/kafka-go/protocol.(*encoder).encodeArray(0xc0003a8230, {{0x6cf100?, 0xc00022e4c8?, 0xc0003a825c?}}, {0x20?, 0x20?}, 0xc000230060)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:175 +0xc4
github.com/segmentio/kafka-go/protocol.arrayEncodeFuncOf.func4(0x71a200?, {{0x6cf100?, 0xc00022e4c8?, 0xc00022c298?}})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:509 +0x29
github.com/segmentio/kafka-go/protocol.structEncodeFuncOf.func2(0xc0003a8230, {{0x71a200?, 0xc00022e4b0?, 0x20?}})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/encode.go:474 +0xd6
github.com/segmentio/kafka-go/protocol.WriteRequest({0x7f2ebdc22490, 0xc000d69a40}, 0x8, 0x249c8, {0x0, 0x0}, {0x7c2be0, 0xc00022e4b0})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/request.go:124 +0x587
github.com/segmentio/kafka-go/protocol.RoundTrip({0x7c40e0, 0xc000d69a40}, 0x8, 0x249c8, {0x0?, 0xc00027c1c0?}, {0x7c2be0, 0xc00022e4b0})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/roundtrip.go:9 +0x65
github.com/segmentio/kafka-go/protocol.(*Conn).RoundTrip(0xc000d69a40, {0x7c2be0, 0xc00022e4b0})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/protocol/conn.go:94 +0x17d
github.com/segmentio/kafka-go.(*conn).roundTrip(0xc00027c000?, {0x7c4f30, 0xc0003a81e0}, 0xc000d69a40, {0x7c2be0, 0xc00022e4b0})
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/transport.go:1282 +0x145
github.com/segmentio/kafka-go.(*conn).run(0xc0003a8000, 0xc000d69a40, 0xc0007ac060)
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/transport.go:1258 +0x10c
created by github.com/segmentio/kafka-go.(*connGroup).connect in goroutine 2583
/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.47/transport.go:1235 +0xd2b
Kafka Version
- What version(s) of Kafka are you testing against?
https://downloads.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz- What version of kafka-go are you using?
github.com/segmentio/kafka-go v0.4.47
To Reproduce
Resources to reproduce the behavior:
go version: 1.22.5
---
# docker-compose.yaml
#
# Adding a docker-compose file will help the maintainers setup the environment
# to reproduce the issue.
#
# If one the docker-compose files available in the repository may be used,
# mentioning it is also a useful alternative.
...
Config kafka writer
var MAX_BATCH_SIZE = 4096
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: kafkaBrokers,
Async: true,
BatchSize: MAX_BATCH_SIZE,
BatchBytes: 32 * 1024 * 1024,
BatchTimeout: 50 * time.Millisecond,
RequiredAcks: 0,
})
Code send messages:
go func() {
for {
select {
case msgPerTopic, ok := <-resultCh:
if !ok {
log.Println("Channel closed, stopping consumer")
return
}
send := func(topic string, messages [][]byte) error {
if len(messages) == 0 {
return nil
}
topicPerNetwork := fmt.Sprintf("%s_%s", network, topic)
log.Printf("⚡ Sending %d messages to Kafka topic: %s", len(messages), topicPerNetwork)
produceCtx, cancel := context.WithTimeout(ctx, 500*time.Second)
err := produceBatchToKafka(produceCtx, producer, topicPerNetwork, messages)
cancel() // Cleanup context
if err != nil {
log.Printf("❌ Error producing batch to Kafka topic %s: %v", topic, err)
return err
} else {
log.Printf("✅ Successfully sent batch to Kafka topic: %s", topicPerNetwork)
return nil
}
}
allSuccess := true
if msgPerTopic.A_TOPIC != nil {
if err := send(A, *msgPerTopic.A); err != nil {
allSuccess = false
}
}
if msgPerTopic.B_TOPIC != nil {
if err := send(B_TOPIC, *msgPerTopic.B); err != nil {
allSuccess = false
}
}
if msgPerTopic.C_TOPIC != nil {
if err := send(C_TOPIC, *msgPerTopic.C); err != nil {
allSuccess = false
}
}
// Send ack info to ack handler if successful
if allSuccess {
for _, ackInfo := range msgPerTopic.AckInfos {
select {
case ackCh <- ackInfo:
case <-time.After(10 * time.Millisecond):
log.Printf("⚠️ Ack channel full, manual ack for tag %d", ackInfo.DeliveryTag)
// err := safeAck(ackInfo.DeliveryTag, ackInfo.Multiple)
// if err != nil {
// log.Printf("❌ Error manual acking: %v", err)
// }
}
}
} else {
// Nack immediately if failed
for _, ackInfo := range msgPerTopic.AckInfos {
err := safeNack(ackInfo.DeliveryTag, ackInfo.Multiple, true)
if err != nil {
log.Printf("❌ Error nacking message: %v", err)
}
}
}
default:
}
}
}()
func produceBatchToKafka(ctx context.Context, producer *kafka.Writer, topic string, messages [][]byte) error {
kafkaMessages := make([]kafka.Message, len(messages))
for i, msg := range messages {
kafkaMessages[i] = kafka.Message{Topic: topic, Value: msg}
}
return producer.WriteMessages(ctx, kafkaMessages...)
}
Expected Behavior
A clear and concise description of what you expected to happen.
No panic when write batchs with async mode
A clear and concise description of the behavior you observed.
Sometimes, after running for a while (6 hours at least)
Additional Context
Add any other context about the problem here.