From d10fc24a9110f221e35fab77970d74cb37dcdb2d Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sat, 23 Apr 2022 14:04:17 -0400 Subject: [PATCH 01/13] Codec interface --- cmd/kaf/produce.go | 14 +++++++++++++- pkg/avro/avro.go | 18 ++++++++++++++++++ pkg/proto/proto.go | 30 ++++++++++++++++++++++++++++++ pkg/util/codec.go | 9 +++++++++ 4 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 pkg/avro/avro.go create mode 100644 pkg/util/codec.go diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index 3bec3587..5dec3dd4 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -15,7 +15,9 @@ import ( "github.com/Masterminds/sprig" "github.com/Shopify/sarama" + "github.com/birdayz/kaf/pkg/avro" "github.com/birdayz/kaf/pkg/partitioner" + "github.com/birdayz/kaf/pkg/util" pb "github.com/golang/protobuf/proto" "github.com/spf13/cobra" ) @@ -163,6 +165,15 @@ var produceCmd = &cobra.Command{ key = sarama.ByteEncoder(avroKey) } + var encoder util.Encoder + if avroSchemaID != -1 { + schemaCache = getSchemaCache() + if schemaCache == nil { + errorExit("Error getting a instance of schemaCache") + } + encoder = avro.NewAvroCodec(avroSchemaID, schemaCache) + } + var headers []sarama.RecordHeader for _, h := range headerFlag { v := strings.SplitN(h, ":", 2) @@ -192,7 +203,8 @@ var produceCmd = &cobra.Command{ errorExit("Failed to load payload proto type") } } else if avroSchemaID != -1 { - avro, err := schemaCache.EncodeMessage(avroSchemaID, data) + // avro, err := schemaCache.EncodeMessage(avroSchemaID, data) + avro, err := encoder.Encode(data) if err != nil { errorExit("Failed to encode avro value", err) } diff --git a/pkg/avro/avro.go b/pkg/avro/avro.go new file mode 100644 index 00000000..2adadcff --- /dev/null +++ b/pkg/avro/avro.go @@ -0,0 +1,18 @@ +package avro + +type AvroCodec struct { + schemaID int + schemaCache *SchemaCache +} + +func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { + return &AvroCodec {schemaID, cache} +} + +func (a *AvroCodec) Encode(in []byte) ([]byte, error) { + return a.schemaCache.EncodeMessage(a.schemaID, in) +} + +func (a *AvroCodec) Decode(in []byte) ([]byte, error) { + return a.schemaCache.DecodeMessage(in) +} diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index ca157401..00583f91 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -1,11 +1,13 @@ package proto import ( + "errors" "os" "path/filepath" "strings" + "github.com/golang/protobuf/proto" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/desc/protoparse" "github.com/jhump/protoreflect/dynamic" @@ -73,3 +75,31 @@ func (d *DescriptorRegistry) MessageForType(_type string) *dynamic.Message { } return nil } + +type ProtoCodec struct { + registry *DescriptorRegistry + protoType string +} + +func NewProtoCodec(protoType string, registry *DescriptorRegistry) *ProtoCodec { + return &ProtoCodec{registry, protoType} +} + +func (p *ProtoCodec) Encode(in []byte) ([]byte, error) { + + if dynamicMessage := p.registry.MessageForType(p.protoType); dynamicMessage != nil { + err := dynamicMessage.UnmarshalJSON(in) + if err != nil { + return nil, err + } + + pb, err := proto.Marshal(dynamicMessage) + if err != nil { + return nil, err + } + + return pb, nil + } else { + return nil, errors.New("Error") + } +} diff --git a/pkg/util/codec.go b/pkg/util/codec.go new file mode 100644 index 00000000..2e1a1854 --- /dev/null +++ b/pkg/util/codec.go @@ -0,0 +1,9 @@ +package util + +type Encoder interface { + Encode(in []byte) ([]byte, error) +} + +type Decoder interface { + Decode(in []byte) ([]byte, error) +} From eb82e6c91328e3c0b4214267215c0d2139b306ac Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sat, 23 Apr 2022 18:03:38 -0400 Subject: [PATCH 02/13] Refactoring producer to use the new encoders --- cmd/kaf/produce.go | 86 ++++++++++++++++------------------------------ pkg/proto/proto.go | 8 ++--- pkg/util/codec.go | 10 ++++++ 3 files changed, 44 insertions(+), 60 deletions(-) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index 5dec3dd4..e56a8a5f 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -17,8 +17,8 @@ import ( "github.com/Shopify/sarama" "github.com/birdayz/kaf/pkg/avro" "github.com/birdayz/kaf/pkg/partitioner" + "github.com/birdayz/kaf/pkg/proto" "github.com/birdayz/kaf/pkg/util" - pb "github.com/golang/protobuf/proto" "github.com/spf13/cobra" ) @@ -89,6 +89,26 @@ func readFull(reader io.Reader, out chan []byte) { close(out) } +func valueEncoder() util.Encoder { + if protoType != "" { + return proto.NewProtoCodec(protoType, reg) + } else if avroSchemaID != -1 { + return avro.NewAvroCodec(avroSchemaID, schemaCache) + } else { + return &util.BypassCodec{} + } +} + +func keyEncoder() util.Encoder { + if keyProtoType != "" { + return proto.NewProtoCodec(keyProtoType, reg) + } else if avroKeySchemaID != -1 { + return avro.NewAvroCodec(avroKeySchemaID, schemaCache) + } else { + return &util.BypassCodec{} + } +} + var produceCmd = &cobra.Command{ Use: "produce TOPIC", Short: "Produce record. Reads data from stdin.", @@ -130,6 +150,9 @@ var produceCmd = &cobra.Command{ go readLines(inReader, out) } + valueEncoder := valueEncoder() + keyEncoder := keyEncoder() + var key sarama.Encoder if rawKeyFlag { keyBytes, err := base64.RawStdEncoding.DecodeString(keyFlag) @@ -138,40 +161,11 @@ var produceCmd = &cobra.Command{ } key = sarama.ByteEncoder(keyBytes) } else { - key = sarama.StringEncoder(keyFlag) - } - if keyProtoType != "" { - if dynamicMessage := reg.MessageForType(keyProtoType); dynamicMessage != nil { - err = dynamicMessage.UnmarshalJSON([]byte(keyFlag)) - if err != nil { - errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err) - } - - pb, err := pb.Marshal(dynamicMessage) - if err != nil { - errorExit("Failed to marshal proto: %v", err) - } - - key = sarama.ByteEncoder(pb) - } else { - errorExit("Failed to load key proto type") - } - - } else if avroKeySchemaID != -1 { - avroKey, err := schemaCache.EncodeMessage(avroKeySchemaID, []byte(keyFlag)) + encodedKey, err := keyEncoder.Encode([]byte(keyFlag)) if err != nil { - errorExit("Failed to encode avro key", err) - } - key = sarama.ByteEncoder(avroKey) - } - - var encoder util.Encoder - if avroSchemaID != -1 { - schemaCache = getSchemaCache() - if schemaCache == nil { - errorExit("Error getting a instance of schemaCache") + errorExit("%v", err) } - encoder = avro.NewAvroCodec(avroSchemaID, schemaCache) + key = sarama.ByteEncoder(encodedKey) } var headers []sarama.RecordHeader @@ -186,29 +180,9 @@ var produceCmd = &cobra.Command{ } for data := range out { - if protoType != "" { - if dynamicMessage := reg.MessageForType(protoType); dynamicMessage != nil { - err = dynamicMessage.UnmarshalJSON(data) - if err != nil { - errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err) - } - - pb, err := pb.Marshal(dynamicMessage) - if err != nil { - errorExit("Failed to marshal proto: %v", err) - } - - data = pb - } else { - errorExit("Failed to load payload proto type") - } - } else if avroSchemaID != -1 { - // avro, err := schemaCache.EncodeMessage(avroSchemaID, data) - avro, err := encoder.Encode(data) - if err != nil { - errorExit("Failed to encode avro value", err) - } - data = avro + data, err = valueEncoder.Encode(data) + if err != nil { + errorExit("%v", err) } var ts time.Time diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 00583f91..4f00aa71 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -2,6 +2,7 @@ package proto import ( "errors" + "fmt" "os" "path/filepath" @@ -86,20 +87,19 @@ func NewProtoCodec(protoType string, registry *DescriptorRegistry) *ProtoCodec { } func (p *ProtoCodec) Encode(in []byte) ([]byte, error) { - if dynamicMessage := p.registry.MessageForType(p.protoType); dynamicMessage != nil { err := dynamicMessage.UnmarshalJSON(in) if err != nil { - return nil, err + return nil, fmt.Errorf("Failed to parse input JSON as proto type %v: %v", p.protoType, err) } pb, err := proto.Marshal(dynamicMessage) if err != nil { - return nil, err + return nil, fmt.Errorf("Failed to marshal proto: %v", err) } return pb, nil } else { - return nil, errors.New("Error") + return nil, errors.New("Failed to load payload proto type") } } diff --git a/pkg/util/codec.go b/pkg/util/codec.go index 2e1a1854..ed6fa302 100644 --- a/pkg/util/codec.go +++ b/pkg/util/codec.go @@ -7,3 +7,13 @@ type Encoder interface { type Decoder interface { Decode(in []byte) ([]byte, error) } + +type BypassCodec struct{} + +func (BypassCodec) Encode(in []byte) ([]byte, error) { + return in, nil +} + +func (BypassCodec) Decode(in []byte) ([]byte, error) { + return in, nil +} From d7a29ea25b50d9a324cc02a5c5daa3a9bdf8bf22 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sun, 24 Apr 2022 12:32:13 -0400 Subject: [PATCH 03/13] Renaming to codec package --- cmd/kaf/produce.go | 10 +++++----- pkg/{util => codec}/codec.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) rename pkg/{util => codec}/codec.go (95%) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index e56a8a5f..d8aa9460 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -18,7 +18,7 @@ import ( "github.com/birdayz/kaf/pkg/avro" "github.com/birdayz/kaf/pkg/partitioner" "github.com/birdayz/kaf/pkg/proto" - "github.com/birdayz/kaf/pkg/util" + "github.com/birdayz/kaf/pkg/codec" "github.com/spf13/cobra" ) @@ -89,23 +89,23 @@ func readFull(reader io.Reader, out chan []byte) { close(out) } -func valueEncoder() util.Encoder { +func valueEncoder() codec.Encoder { if protoType != "" { return proto.NewProtoCodec(protoType, reg) } else if avroSchemaID != -1 { return avro.NewAvroCodec(avroSchemaID, schemaCache) } else { - return &util.BypassCodec{} + return &codec.BypassCodec{} } } -func keyEncoder() util.Encoder { +func keyEncoder() codec.Encoder { if keyProtoType != "" { return proto.NewProtoCodec(keyProtoType, reg) } else if avroKeySchemaID != -1 { return avro.NewAvroCodec(avroKeySchemaID, schemaCache) } else { - return &util.BypassCodec{} + return &codec.BypassCodec{} } } diff --git a/pkg/util/codec.go b/pkg/codec/codec.go similarity index 95% rename from pkg/util/codec.go rename to pkg/codec/codec.go index ed6fa302..e34510d2 100644 --- a/pkg/util/codec.go +++ b/pkg/codec/codec.go @@ -1,4 +1,4 @@ -package util +package codec type Encoder interface { Encode(in []byte) ([]byte, error) From 780046f779ad02ff73ce9c93c96ed2d99876fa6b Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sun, 24 Apr 2022 13:24:34 -0400 Subject: [PATCH 04/13] Improving error reporting --- cmd/kaf/produce.go | 6 +++--- pkg/avro/avro.go | 4 ++-- pkg/proto/proto.go | 8 +++----- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index d8aa9460..ed9cf767 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -16,9 +16,9 @@ import ( "github.com/Masterminds/sprig" "github.com/Shopify/sarama" "github.com/birdayz/kaf/pkg/avro" + "github.com/birdayz/kaf/pkg/codec" "github.com/birdayz/kaf/pkg/partitioner" "github.com/birdayz/kaf/pkg/proto" - "github.com/birdayz/kaf/pkg/codec" "github.com/spf13/cobra" ) @@ -163,7 +163,7 @@ var produceCmd = &cobra.Command{ } else { encodedKey, err := keyEncoder.Encode([]byte(keyFlag)) if err != nil { - errorExit("%v", err) + errorExit("Error encoding key: %v", err) } key = sarama.ByteEncoder(encodedKey) } @@ -182,7 +182,7 @@ var produceCmd = &cobra.Command{ for data := range out { data, err = valueEncoder.Encode(data) if err != nil { - errorExit("%v", err) + errorExit("Error encoding value: %v", err) } var ts time.Time diff --git a/pkg/avro/avro.go b/pkg/avro/avro.go index 2adadcff..4fefd033 100644 --- a/pkg/avro/avro.go +++ b/pkg/avro/avro.go @@ -6,7 +6,7 @@ type AvroCodec struct { } func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { - return &AvroCodec {schemaID, cache} + return &AvroCodec{schemaID, cache} } func (a *AvroCodec) Encode(in []byte) ([]byte, error) { @@ -14,5 +14,5 @@ func (a *AvroCodec) Encode(in []byte) ([]byte, error) { } func (a *AvroCodec) Decode(in []byte) ([]byte, error) { - return a.schemaCache.DecodeMessage(in) + return a.schemaCache.DecodeMessage(in) } diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 4f00aa71..3728a703 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -1,11 +1,9 @@ package proto import ( - "errors" "fmt" "os" "path/filepath" - "strings" "github.com/golang/protobuf/proto" @@ -90,16 +88,16 @@ func (p *ProtoCodec) Encode(in []byte) ([]byte, error) { if dynamicMessage := p.registry.MessageForType(p.protoType); dynamicMessage != nil { err := dynamicMessage.UnmarshalJSON(in) if err != nil { - return nil, fmt.Errorf("Failed to parse input JSON as proto type %v: %v", p.protoType, err) + return nil, fmt.Errorf("failed to parse input JSON as proto type %v: %v", p.protoType, err) } pb, err := proto.Marshal(dynamicMessage) if err != nil { - return nil, fmt.Errorf("Failed to marshal proto: %v", err) + return nil, fmt.Errorf("failed to marshal proto: %v", err) } return pb, nil } else { - return nil, errors.New("Failed to load payload proto type") + return nil, fmt.Errorf("failed to load payload proto type: %v", p.protoType) } } From d8f7bf9cac1ae4c46cfc18574662af7dc818a28b Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sun, 24 Apr 2022 13:33:30 -0400 Subject: [PATCH 05/13] Adding text fixtures for protobuf --- test/user.proto | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 test/user.proto diff --git a/test/user.proto b/test/user.proto new file mode 100644 index 00000000..28e6d491 --- /dev/null +++ b/test/user.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +message Id { + int32 id = 1; +} + +message User { + int32 id = 1; + string name = 2; +} From d99ea058e1c7dff297095aad1ee1469c238baae1 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sun, 24 Apr 2022 22:26:37 -0400 Subject: [PATCH 06/13] Adding docs --- pkg/avro/avro.go | 32 +++++++++++++++++++++++++++++--- pkg/codec/codec.go | 7 +++++++ pkg/proto/proto.go | 2 ++ 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/avro/avro.go b/pkg/avro/avro.go index 4fefd033..996ea7b4 100644 --- a/pkg/avro/avro.go +++ b/pkg/avro/avro.go @@ -1,8 +1,12 @@ package avro +import "encoding/binary" + +// AvroCodec implements the Encoder/Decoder interfaces for +// avro formats type AvroCodec struct { - schemaID int - schemaCache *SchemaCache + encodeSchemaID int + schemaCache *SchemaCache } func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { @@ -10,7 +14,29 @@ func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { } func (a *AvroCodec) Encode(in []byte) ([]byte, error) { - return a.schemaCache.EncodeMessage(a.schemaID, in) + codec, err := a.schemaCache.getCodecForSchemaID(a.encodeSchemaID) + if err != nil { + return nil, err + } + + // Creates a header with an initial zero byte and + // the schema id encoded as a big endian uint32 + buf := make([]byte, 5) + binary.BigEndian.PutUint32(buf[1:5], uint32(a.encodeSchemaID)) + + // Convert textual json data to native Go form + native, _, err := codec.NativeFromTextual(in) + if err != nil { + return nil, err + } + + // Convert native Go form to binary Avro data + message, err := codec.BinaryFromNative(buf, native) + if err != nil { + return nil, err + } + + return message, nil } func (a *AvroCodec) Decode(in []byte) ([]byte, error) { diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index e34510d2..ba229bf3 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -1,13 +1,20 @@ package codec +// Encoder converts from textual representation to +// bytes in the specified format type Encoder interface { + // Encode textual bytes to binary format Encode(in []byte) ([]byte, error) } +// Decoder converts from binary representation to +// textual in the specified format type Decoder interface { + // Decode binary bytes to text form Decode(in []byte) ([]byte, error) } +// BypassCodec is a no-op implementation of Encoder and Decoder type BypassCodec struct{} func (BypassCodec) Encode(in []byte) ([]byte, error) { diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 3728a703..59822174 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -75,6 +75,8 @@ func (d *DescriptorRegistry) MessageForType(_type string) *dynamic.Message { return nil } +// ProtoCodec implements the Encoder/Decoder interfaces +// for protobuf messages type ProtoCodec struct { registry *DescriptorRegistry protoType string From a85270b4137d19fc45ac3d2f59beb208dc7da0c5 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sun, 24 Apr 2022 23:48:06 -0400 Subject: [PATCH 07/13] Removing dead code --- pkg/avro/avro.go | 29 ++++++++++++++++++++++++++++- pkg/avro/schema.go | 27 --------------------------- test/user.avro | 8 ++++++++ 3 files changed, 36 insertions(+), 28 deletions(-) create mode 100644 test/user.avro diff --git a/pkg/avro/avro.go b/pkg/avro/avro.go index 996ea7b4..b8f4dfa5 100644 --- a/pkg/avro/avro.go +++ b/pkg/avro/avro.go @@ -13,6 +13,7 @@ func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { return &AvroCodec{schemaID, cache} } +// Encode returns a binary representation of an Avro-encoded message. func (a *AvroCodec) Encode(in []byte) ([]byte, error) { codec, err := a.schemaCache.getCodecForSchemaID(a.encodeSchemaID) if err != nil { @@ -39,6 +40,32 @@ func (a *AvroCodec) Encode(in []byte) ([]byte, error) { return message, nil } +// Decode returns a text representation of an Avro-encoded message. func (a *AvroCodec) Decode(in []byte) ([]byte, error) { - return a.schemaCache.DecodeMessage(in) + // Ensure avro header is present with the magic start-byte. + if len(in) < 5 || in[0] != 0x00 { + // The message does not contain Avro-encoded data + return in, nil + } + + // Schema ID is stored in the 4 bytes following the magic byte. + schemaID := binary.BigEndian.Uint32(in[1:5]) + codec, err := a.schemaCache.getCodecForSchemaID(int(schemaID)) + if err != nil { + return in, err + } + + // Convert binary Avro data back to native Go form + native, _, err := codec.NativeFromBinary(in[5:]) + if err != nil { + return in, err + } + + // Convert native Go form to textual Avro data + message, err := codec.TextualFromNative(nil, native) + if err != nil { + return in, err + } + + return message, nil } diff --git a/pkg/avro/schema.go b/pkg/avro/schema.go index 7b02214e..bbe5624e 100644 --- a/pkg/avro/schema.go +++ b/pkg/avro/schema.go @@ -111,30 +111,3 @@ func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) { return message, nil } - -// EncodeMessage returns a binary representation of an Avro-encoded message. -func (c *SchemaCache) EncodeMessage(schemaID int, json []byte) (message []byte, err error) { - codec, err := c.getCodecForSchemaID(schemaID) - if err != nil { - return nil, err - } - - // Creates a header with an initial zero byte and - // the schema id encoded as a big endian uint32 - buf := make([]byte, 5) - binary.BigEndian.PutUint32(buf[1:5], uint32(schemaID)) - - // Convert textual json data to native Go form - native, _, err := codec.NativeFromTextual(json) - if err != nil { - return nil, err - } - - // Convert native Go form to binary Avro data - message, err = codec.BinaryFromNative(buf, native) - if err != nil { - return nil, err - } - - return message, nil -} diff --git a/test/user.avro b/test/user.avro new file mode 100644 index 00000000..67505e5c --- /dev/null +++ b/test/user.avro @@ -0,0 +1,8 @@ +{ + "name": "user", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": "string" } + ] +} From 72aaef53b0b1c9d86f652fd24dbd6e2b236bf5e3 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Fri, 29 Apr 2022 23:45:46 -0400 Subject: [PATCH 08/13] Chaging the encoding interface contract to use json --- pkg/codec/codec.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index ba229bf3..fca16115 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -1,17 +1,18 @@ package codec -// Encoder converts from textual representation to -// bytes in the specified format +import "encoding/json" + +// Encoder converts from json representation +// to bytes in the specified format type Encoder interface { - // Encode textual bytes to binary format - Encode(in []byte) ([]byte, error) + // Encode json to binary format + Encode(in json.RawMessage) ([]byte, error) } -// Decoder converts from binary representation to -// textual in the specified format +// Decoder converts from binary representation to json type Decoder interface { - // Decode binary bytes to text form - Decode(in []byte) ([]byte, error) + // Decode binary to json form + Decode(in []byte) (json.RawMessage, error) } // BypassCodec is a no-op implementation of Encoder and Decoder From 59139164300c744f2f3d6ded0faafa2c30c30168 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sat, 30 Apr 2022 14:54:29 -0400 Subject: [PATCH 09/13] Changing the interface to json.RawMessage --- pkg/avro/avro.go | 9 ++++++--- pkg/codec/codec.go | 4 ++-- pkg/proto/proto.go | 3 ++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/avro/avro.go b/pkg/avro/avro.go index b8f4dfa5..4df04eda 100644 --- a/pkg/avro/avro.go +++ b/pkg/avro/avro.go @@ -1,6 +1,9 @@ package avro -import "encoding/binary" +import ( + "encoding/binary" + "encoding/json" +) // AvroCodec implements the Encoder/Decoder interfaces for // avro formats @@ -14,7 +17,7 @@ func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { } // Encode returns a binary representation of an Avro-encoded message. -func (a *AvroCodec) Encode(in []byte) ([]byte, error) { +func (a *AvroCodec) Encode(in json.RawMessage) ([]byte, error) { codec, err := a.schemaCache.getCodecForSchemaID(a.encodeSchemaID) if err != nil { return nil, err @@ -41,7 +44,7 @@ func (a *AvroCodec) Encode(in []byte) ([]byte, error) { } // Decode returns a text representation of an Avro-encoded message. -func (a *AvroCodec) Decode(in []byte) ([]byte, error) { +func (a *AvroCodec) Decode(in []byte) (json.RawMessage, error) { // Ensure avro header is present with the magic start-byte. if len(in) < 5 || in[0] != 0x00 { // The message does not contain Avro-encoded data diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index fca16115..9e0c8c77 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -18,10 +18,10 @@ type Decoder interface { // BypassCodec is a no-op implementation of Encoder and Decoder type BypassCodec struct{} -func (BypassCodec) Encode(in []byte) ([]byte, error) { +func (BypassCodec) Encode(in json.RawMessage) ([]byte, error) { return in, nil } -func (BypassCodec) Decode(in []byte) ([]byte, error) { +func (BypassCodec) Decode(in json.RawMessage) ([]byte, error) { return in, nil } diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 59822174..09ffa716 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -1,6 +1,7 @@ package proto import ( + "encoding/json" "fmt" "os" "path/filepath" @@ -86,7 +87,7 @@ func NewProtoCodec(protoType string, registry *DescriptorRegistry) *ProtoCodec { return &ProtoCodec{registry, protoType} } -func (p *ProtoCodec) Encode(in []byte) ([]byte, error) { +func (p *ProtoCodec) Encode(in json.RawMessage) ([]byte, error) { if dynamicMessage := p.registry.MessageForType(p.protoType); dynamicMessage != nil { err := dynamicMessage.UnmarshalJSON(in) if err != nil { From 42b69f33f6e0a378a69b5fe10ab978d8ed6b19bf Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sat, 30 Apr 2022 14:55:16 -0400 Subject: [PATCH 10/13] Delaying the encoding so it can be used for templating as well --- cmd/kaf/produce.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index ed9cf767..d2b7bbd1 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -180,11 +180,6 @@ var produceCmd = &cobra.Command{ } for data := range out { - data, err = valueEncoder.Encode(data) - if err != nil { - errorExit("Error encoding value: %v", err) - } - var ts time.Time t, err := time.Parse(time.RFC3339, timestampFlag) if err != nil { @@ -216,6 +211,11 @@ var produceCmd = &cobra.Command{ input = buf.Bytes() } + input, err = valueEncoder.Encode(input) + if err != nil { + errorExit("Error encoding value: %v", err) + } + msg := &sarama.ProducerMessage{ Topic: args[0], Key: key, From edf7a96349ce69d123d73126cccda1963db560a5 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Sat, 30 Apr 2022 16:49:40 -0400 Subject: [PATCH 11/13] Moving files around acording to code review feedback --- cmd/kaf/consume.go | 6 +++--- cmd/kaf/kaf.go | 4 ++-- cmd/kaf/produce.go | 10 ++++------ pkg/avro/schema.go | 4 ++-- pkg/{avro => codec}/avro.go | 12 +++++++----- pkg/{proto => codec}/proto.go | 2 +- {test => pkg/codec}/user.avro | 0 {test => pkg/codec}/user.proto | 0 8 files changed, 19 insertions(+), 19 deletions(-) rename pkg/{avro => codec}/avro.go (85%) rename pkg/{proto => codec}/proto.go (99%) rename {test => pkg/codec}/user.avro (100%) rename {test => pkg/codec}/user.proto (100%) diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 3a37b329..7d94e34b 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -13,7 +13,7 @@ import ( "github.com/Shopify/sarama" "github.com/birdayz/kaf/pkg/avro" - "github.com/birdayz/kaf/pkg/proto" + "github.com/birdayz/kaf/pkg/codec" "github.com/golang/protobuf/jsonpb" prettyjson "github.com/hokaccha/go-prettyjson" "github.com/spf13/cobra" @@ -37,7 +37,7 @@ var ( limitMessagesFlag int64 - reg *proto.DescriptorRegistry + reg *codec.DescriptorRegistry ) func init() { @@ -315,7 +315,7 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { } // proto to JSON -func protoDecode(reg *proto.DescriptorRegistry, b []byte, _type string) ([]byte, error) { +func protoDecode(reg *codec.DescriptorRegistry, b []byte, _type string) ([]byte, error) { dynamicMessage := reg.MessageForType(_type) if dynamicMessage == nil { return b, nil diff --git a/cmd/kaf/kaf.go b/cmd/kaf/kaf.go index ec43e576..9e9a2a79 100644 --- a/cmd/kaf/kaf.go +++ b/cmd/kaf/kaf.go @@ -15,8 +15,8 @@ import ( "github.com/spf13/cobra" "github.com/birdayz/kaf/pkg/avro" + "github.com/birdayz/kaf/pkg/codec" "github.com/birdayz/kaf/pkg/config" - "github.com/birdayz/kaf/pkg/proto" ) var cfgFile string @@ -174,7 +174,7 @@ func init() { var setupProtoDescriptorRegistry = func(cmd *cobra.Command, args []string) { if protoType != "" { - r, err := proto.NewDescriptorRegistry(protoFiles, protoExclude) + r, err := codec.NewDescriptorRegistry(protoFiles, protoExclude) if err != nil { errorExit("Failed to load protobuf files: %v\n", err) } diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index d2b7bbd1..c22546ae 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -15,10 +15,8 @@ import ( "github.com/Masterminds/sprig" "github.com/Shopify/sarama" - "github.com/birdayz/kaf/pkg/avro" "github.com/birdayz/kaf/pkg/codec" "github.com/birdayz/kaf/pkg/partitioner" - "github.com/birdayz/kaf/pkg/proto" "github.com/spf13/cobra" ) @@ -91,9 +89,9 @@ func readFull(reader io.Reader, out chan []byte) { func valueEncoder() codec.Encoder { if protoType != "" { - return proto.NewProtoCodec(protoType, reg) + return codec.NewProtoCodec(protoType, reg) } else if avroSchemaID != -1 { - return avro.NewAvroCodec(avroSchemaID, schemaCache) + return codec.NewAvroCodec(avroSchemaID, schemaCache) } else { return &codec.BypassCodec{} } @@ -101,9 +99,9 @@ func valueEncoder() codec.Encoder { func keyEncoder() codec.Encoder { if keyProtoType != "" { - return proto.NewProtoCodec(keyProtoType, reg) + return codec.NewProtoCodec(keyProtoType, reg) } else if avroKeySchemaID != -1 { - return avro.NewAvroCodec(avroKeySchemaID, schemaCache) + return codec.NewAvroCodec(avroKeySchemaID, schemaCache) } else { return &codec.BypassCodec{} } diff --git a/pkg/avro/schema.go b/pkg/avro/schema.go index bbe5624e..ba396bde 100644 --- a/pkg/avro/schema.go +++ b/pkg/avro/schema.go @@ -38,7 +38,7 @@ func NewSchemaCache(url string) (*SchemaCache, error) { } // getCodecForSchemaID returns a goavro codec for transforming data. -func (c *SchemaCache) getCodecForSchemaID(schemaID int) (codec *goavro.Codec, err error) { +func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, err error) { c.mu.RLock() cc, ok := c.codecsBySchemaID[schemaID] c.mu.RUnlock() @@ -92,7 +92,7 @@ func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) { // Schema ID is stored in the 4 bytes following the magic byte. schemaID := binary.BigEndian.Uint32(b[1:5]) - codec, err := c.getCodecForSchemaID(int(schemaID)) + codec, err := c.GetCodecForSchemaID(int(schemaID)) if err != nil { return b, err } diff --git a/pkg/avro/avro.go b/pkg/codec/avro.go similarity index 85% rename from pkg/avro/avro.go rename to pkg/codec/avro.go index 4df04eda..8ecf4753 100644 --- a/pkg/avro/avro.go +++ b/pkg/codec/avro.go @@ -1,24 +1,26 @@ -package avro +package codec import ( "encoding/binary" "encoding/json" + + "github.com/birdayz/kaf/pkg/avro" ) // AvroCodec implements the Encoder/Decoder interfaces for // avro formats type AvroCodec struct { encodeSchemaID int - schemaCache *SchemaCache + schemaCache *avro.SchemaCache } -func NewAvroCodec(schemaID int, cache *SchemaCache) *AvroCodec { +func NewAvroCodec(schemaID int, cache *avro.SchemaCache) *AvroCodec { return &AvroCodec{schemaID, cache} } // Encode returns a binary representation of an Avro-encoded message. func (a *AvroCodec) Encode(in json.RawMessage) ([]byte, error) { - codec, err := a.schemaCache.getCodecForSchemaID(a.encodeSchemaID) + codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID) if err != nil { return nil, err } @@ -53,7 +55,7 @@ func (a *AvroCodec) Decode(in []byte) (json.RawMessage, error) { // Schema ID is stored in the 4 bytes following the magic byte. schemaID := binary.BigEndian.Uint32(in[1:5]) - codec, err := a.schemaCache.getCodecForSchemaID(int(schemaID)) + codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID)) if err != nil { return in, err } diff --git a/pkg/proto/proto.go b/pkg/codec/proto.go similarity index 99% rename from pkg/proto/proto.go rename to pkg/codec/proto.go index 09ffa716..35874e03 100644 --- a/pkg/proto/proto.go +++ b/pkg/codec/proto.go @@ -1,4 +1,4 @@ -package proto +package codec import ( "encoding/json" diff --git a/test/user.avro b/pkg/codec/user.avro similarity index 100% rename from test/user.avro rename to pkg/codec/user.avro diff --git a/test/user.proto b/pkg/codec/user.proto similarity index 100% rename from test/user.proto rename to pkg/codec/user.proto From 1add193e751137b4a71af072eab006073377d5f5 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Mon, 2 May 2022 18:18:39 -0400 Subject: [PATCH 12/13] Using interface to decode messages --- cmd/kaf/consume.go | 54 +++++++++++++--------------------------------- cmd/kaf/query.go | 7 ++++-- pkg/codec/proto.go | 23 ++++++++++++++++++++ 3 files changed, 43 insertions(+), 41 deletions(-) diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 7d94e34b..589d404b 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -14,7 +14,6 @@ import ( "github.com/Shopify/sarama" "github.com/birdayz/kaf/pkg/avro" "github.com/birdayz/kaf/pkg/codec" - "github.com/golang/protobuf/jsonpb" prettyjson "github.com/hokaccha/go-prettyjson" "github.com/spf13/cobra" "github.com/vmihailenco/msgpack/v5" @@ -38,6 +37,10 @@ var ( limitMessagesFlag int64 reg *codec.DescriptorRegistry + + protoDecoder *codec.ProtoCodec + protoKeyDecoder *codec.ProtoCodec + avroDecoder *codec.AvroCodec ) func init() { @@ -95,6 +98,11 @@ var consumeCmd = &cobra.Command{ topic := args[0] client := getClientFromConfig(cfg) + schemaCache = getSchemaCache() + avroDecoder = codec.NewAvroCodec(-1, schemaCache) + protoDecoder = codec.NewProtoCodec(protoType, reg) + protoKeyDecoder = codec.NewProtoCodec(keyProtoType, reg) + switch offsetFlag { case "oldest": offset = sarama.OffsetOldest @@ -169,8 +177,6 @@ func withoutConsumerGroup(ctx context.Context, client sarama.Client, topic strin partitions = flagPartitions } - schemaCache = getSchemaCache() - wg := sync.WaitGroup{} mu := sync.Mutex{} // Synchronizes stderr and stdout. for _, partition := range partitions { @@ -230,24 +236,24 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { var err error if protoType != "" { - dataToDisplay, err = protoDecode(reg, msg.Value, protoType) + dataToDisplay, err = protoDecoder.Decode(msg.Value) if err != nil { - fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary outputla. Error: %v\n", err) + fmt.Fprintf(&stderr, "failed to decode proto. falling back to binary output. Error: %v\n", err) } } else { - dataToDisplay, err = avroDecode(msg.Value) + dataToDisplay, err = avroDecoder.Decode(msg.Value) if err != nil { fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err) } } if keyProtoType != "" { - keyToDisplay, err = protoDecode(reg, msg.Key, keyProtoType) + keyToDisplay, err = protoKeyDecoder.Decode(msg.Key) if err != nil { - fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary outputla. Error: %v\n", err) + fmt.Fprintf(&stderr, "failed to decode proto key. falling back to binary output. Error: %v\n", err) } } else { - keyToDisplay, err = avroDecode(msg.Key) + keyToDisplay, err = avroDecoder.Decode(msg.Key) if err != nil { fmt.Fprintf(&stderr, "could not decode Avro data: %v\n", err) } @@ -314,36 +320,6 @@ func handleMessage(msg *sarama.ConsumerMessage, mu *sync.Mutex) { } -// proto to JSON -func protoDecode(reg *codec.DescriptorRegistry, b []byte, _type string) ([]byte, error) { - dynamicMessage := reg.MessageForType(_type) - if dynamicMessage == nil { - return b, nil - } - - err := dynamicMessage.Unmarshal(b) - if err != nil { - return nil, err - } - - var m jsonpb.Marshaler - var w bytes.Buffer - - err = m.Marshal(&w, dynamicMessage) - if err != nil { - return nil, err - } - return w.Bytes(), nil - -} - -func avroDecode(b []byte) ([]byte, error) { - if schemaCache != nil { - return schemaCache.DecodeMessage(b) - } - return b, nil -} - func formatKey(key []byte) []byte { if b, err := keyfmt.Format(key); err == nil { return b diff --git a/cmd/kaf/query.go b/cmd/kaf/query.go index d907095d..2ef170e3 100644 --- a/cmd/kaf/query.go +++ b/cmd/kaf/query.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/Shopify/sarama" + "github.com/birdayz/kaf/pkg/codec" "github.com/spf13/cobra" ) @@ -46,6 +47,8 @@ var queryCmd = &cobra.Command{ } schemaCache = getSchemaCache() + protoDecoder := codec.NewProtoCodec(protoType, reg) + protoKeyDecoder := codec.NewProtoCodec(keyProtoType, reg) wg := sync.WaitGroup{} @@ -72,7 +75,7 @@ var queryCmd = &cobra.Command{ var keyTextRaw string var valueTextRaw string if protoType != "" { - d, err := protoDecode(reg, msg.Value, protoType) + d, err := protoDecoder.Decode(msg.Value) if err != nil { fmt.Println("Failed proto decode") } @@ -82,7 +85,7 @@ var queryCmd = &cobra.Command{ } if keyProtoType != "" { - d, err := protoDecode(reg, msg.Key, keyProtoType) + d, err := protoKeyDecoder.Decode(msg.Key) if err != nil { fmt.Println("Failed proto decode") } diff --git a/pkg/codec/proto.go b/pkg/codec/proto.go index 35874e03..bab9b89f 100644 --- a/pkg/codec/proto.go +++ b/pkg/codec/proto.go @@ -1,12 +1,14 @@ package codec import ( + "bytes" "encoding/json" "fmt" "os" "path/filepath" "strings" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/desc/protoparse" @@ -104,3 +106,24 @@ func (p *ProtoCodec) Encode(in json.RawMessage) ([]byte, error) { return nil, fmt.Errorf("failed to load payload proto type: %v", p.protoType) } } + +func (p *ProtoCodec) Decode(in []byte) (json.RawMessage, error) { + dynamicMessage := p.registry.MessageForType(p.protoType) + if dynamicMessage == nil { + return in, nil + } + + err := dynamicMessage.Unmarshal(in) + if err != nil { + return nil, err + } + + var m jsonpb.Marshaler + var w bytes.Buffer + + err = m.Marshal(&w, dynamicMessage) + if err != nil { + return nil, err + } + return w.Bytes(), nil +} From 6b5727237357144fb7bb73c23fc9fc8c94cb7d3d Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Mon, 2 May 2022 18:20:45 -0400 Subject: [PATCH 13/13] Adding flag to use strict avro parsing --- cmd/kaf/consume.go | 2 +- cmd/kaf/produce.go | 6 ++++-- pkg/avro/schema.go | 39 ++++++--------------------------------- pkg/codec/avro.go | 9 +++++---- 4 files changed, 16 insertions(+), 40 deletions(-) diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 589d404b..5934e5d3 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -99,7 +99,7 @@ var consumeCmd = &cobra.Command{ client := getClientFromConfig(cfg) schemaCache = getSchemaCache() - avroDecoder = codec.NewAvroCodec(-1, schemaCache) + avroDecoder = codec.NewAvroCodec(-1, false, schemaCache) protoDecoder = codec.NewProtoCodec(protoType, reg) protoKeyDecoder = codec.NewProtoCodec(keyProtoType, reg) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index c22546ae..66ee2bbe 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -32,6 +32,7 @@ var ( inputModeFlag string avroSchemaID int avroKeySchemaID int + avroStrictFlag bool templateFlag bool ) @@ -54,6 +55,7 @@ func init() { produceCmd.Flags().IntVarP(&avroSchemaID, "avro-schema-id", "", -1, "Value schema id for avro messsage encoding") produceCmd.Flags().IntVarP(&avroKeySchemaID, "avro-key-schema-id", "", -1, "Key schema id for avro messsage encoding") + produceCmd.Flags().BoolVar(&avroStrictFlag, "avro-strict", false, "Uses strict version of the input json to parse unions") produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]") produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode") @@ -91,7 +93,7 @@ func valueEncoder() codec.Encoder { if protoType != "" { return codec.NewProtoCodec(protoType, reg) } else if avroSchemaID != -1 { - return codec.NewAvroCodec(avroSchemaID, schemaCache) + return codec.NewAvroCodec(avroSchemaID, avroStrictFlag, schemaCache) } else { return &codec.BypassCodec{} } @@ -101,7 +103,7 @@ func keyEncoder() codec.Encoder { if keyProtoType != "" { return codec.NewProtoCodec(keyProtoType, reg) } else if avroKeySchemaID != -1 { - return codec.NewAvroCodec(avroKeySchemaID, schemaCache) + return codec.NewAvroCodec(avroKeySchemaID, avroStrictFlag, schemaCache) } else { return &codec.BypassCodec{} } diff --git a/pkg/avro/schema.go b/pkg/avro/schema.go index ba396bde..00e93f32 100644 --- a/pkg/avro/schema.go +++ b/pkg/avro/schema.go @@ -1,7 +1,6 @@ package avro import ( - "encoding/binary" "sync" schemaregistry "github.com/Landoop/schema-registry" @@ -38,7 +37,7 @@ func NewSchemaCache(url string) (*SchemaCache, error) { } // getCodecForSchemaID returns a goavro codec for transforming data. -func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, err error) { +func (c *SchemaCache) GetCodecForSchemaID(schemaID int, strict bool) (codec *goavro.Codec, err error) { c.mu.RLock() cc, ok := c.codecsBySchemaID[schemaID] c.mu.RUnlock() @@ -74,40 +73,14 @@ func (c *SchemaCache) GetCodecForSchemaID(schemaID int) (codec *goavro.Codec, er return nil, err } - codec, err = goavro.NewCodec(schema) + if strict { + codec, err = goavro.NewCodec(schema) + } else { + codec, err = goavro.NewCodecForStandardJSON(schema) + } if err != nil { return nil, err } return codec, nil } - -// DecodeMessage returns a text representation of an Avro-encoded message. -func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) { - // Ensure avro header is present with the magic start-byte. - if len(b) < 5 || b[0] != 0x00 { - // The message does not contain Avro-encoded data - return b, nil - } - - // Schema ID is stored in the 4 bytes following the magic byte. - schemaID := binary.BigEndian.Uint32(b[1:5]) - codec, err := c.GetCodecForSchemaID(int(schemaID)) - if err != nil { - return b, err - } - - // Convert binary Avro data back to native Go form - native, _, err := codec.NativeFromBinary(b[5:]) - if err != nil { - return b, err - } - - // Convert native Go form to textual Avro data - message, err = codec.TextualFromNative(nil, native) - if err != nil { - return b, err - } - - return message, nil -} diff --git a/pkg/codec/avro.go b/pkg/codec/avro.go index 8ecf4753..f2ddfde6 100644 --- a/pkg/codec/avro.go +++ b/pkg/codec/avro.go @@ -11,16 +11,17 @@ import ( // avro formats type AvroCodec struct { encodeSchemaID int + strict bool schemaCache *avro.SchemaCache } -func NewAvroCodec(schemaID int, cache *avro.SchemaCache) *AvroCodec { - return &AvroCodec{schemaID, cache} +func NewAvroCodec(schemaID int, strict bool, cache *avro.SchemaCache) *AvroCodec { + return &AvroCodec{schemaID, strict, cache} } // Encode returns a binary representation of an Avro-encoded message. func (a *AvroCodec) Encode(in json.RawMessage) ([]byte, error) { - codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID) + codec, err := a.schemaCache.GetCodecForSchemaID(a.encodeSchemaID, a.strict) if err != nil { return nil, err } @@ -55,7 +56,7 @@ func (a *AvroCodec) Decode(in []byte) (json.RawMessage, error) { // Schema ID is stored in the 4 bytes following the magic byte. schemaID := binary.BigEndian.Uint32(in[1:5]) - codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID)) + codec, err := a.schemaCache.GetCodecForSchemaID(int(schemaID), a.strict) if err != nil { return in, err }