From 8946f876d7b56c3f9940ddebe411590d8fef5413 Mon Sep 17 00:00:00 2001 From: Kirill Murzin Date: Thu, 28 Aug 2025 11:16:10 +0300 Subject: [PATCH 1/3] added support for configuring proto types in config --- cmd/kaf/consume.go | 15 ++++++++++++++- cmd/kaf/produce.go | 15 ++++++++++++++- examples/proto-types.yaml | 17 +++++++++++++++++ pkg/config/config.go | 7 +++++++ 4 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 examples/proto-types.yaml diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 132dd385..674490cb 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -98,7 +98,7 @@ var consumeCmd = &cobra.Command{ Short: "Consume messages", Args: cobra.ExactArgs(1), ValidArgsFunction: validTopicArgs, - PreRun: setupProtoDescriptorRegistry, + PreRun: consumePreRun, Run: func(cmd *cobra.Command, args []string) { var offset int64 cfg := getConfig() @@ -134,6 +134,19 @@ var consumeCmd = &cobra.Command{ }, } +func consumePreRun(cmd *cobra.Command, args []string) { + if protoType == "" { + for _, topic := range cfg.Topics { + if topic.Name == args[0] { + protoType = topic.ProtoType + protoFiles = topic.ProtoPaths + break + } + } + } + setupProtoDescriptorRegistry(cmd, args) +} + type g struct{} func (g *g) Setup(s sarama.ConsumerGroupSession) error { diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index ca138285..07f230e4 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -91,7 +91,7 @@ var produceCmd = &cobra.Command{ Short: "Produce record. Reads data from stdin.", Args: cobra.ExactArgs(1), ValidArgsFunction: validTopicArgs, - PreRun: setupProtoDescriptorRegistry, + PreRun: producePreRun, Run: func(cmd *cobra.Command, args []string) { cfg := getConfig() switch partitionerFlag { @@ -257,3 +257,16 @@ var produceCmd = &cobra.Command{ } }, } + +func producePreRun(cmd *cobra.Command, args []string) { + if protoType == "" { + for _, topic := range cfg.Topics { + if topic.Name == args[0] { + protoType = topic.ProtoType + protoFiles = topic.ProtoPaths + break + } + } + } + setupProtoDescriptorRegistry(cmd, args) +} diff --git a/examples/proto-types.yaml b/examples/proto-types.yaml new file mode 100644 index 00000000..f4f9b4e7 --- /dev/null +++ b/examples/proto-types.yaml @@ -0,0 +1,17 @@ +clusters: + - name: local + brokers: + - localhost:9092 + SASL: null + TLS: null + security-protocol: "" + version: "1.0.0" +topics: + - name: topic1 + proto-type: local.ProtoType1 + proto-paths: + - /path_to_proto_files/proto_storage + - name: topic2 + proto-type: local.ProtoType2 + proto-paths: + - /path_to_proto_files/proto_storage diff --git a/pkg/config/config.go b/pkg/config/config.go index 698ee07b..2c1298b6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -44,10 +44,17 @@ type Cluster struct { SchemaRegistryCredentials *SchemaRegistryCredentials `yaml:"schema-registry-credentials"` } +type Topic struct { + Name string `yaml:"name"` + ProtoType string `yaml:"proto-type"` + ProtoPaths []string `yaml:"proto-paths"` +} + type Config struct { CurrentCluster string `yaml:"current-cluster"` ClusterOverride string Clusters []*Cluster `yaml:"clusters"` + Topics []Topic `yaml:"topics"` } func (c *Config) SetCurrentCluster(name string) error { From be1cb42dcf06f65f7f78aaf5c3df4b3198ec4472 Mon Sep 17 00:00:00 2001 From: Kirill Murzin Date: Mon, 1 Sep 2025 11:51:49 +0300 Subject: [PATCH 2/3] refactoring --- cmd/kaf/consume.go | 10 +--------- cmd/kaf/kaf.go | 13 +++++++++++++ cmd/kaf/produce.go | 10 +--------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 674490cb..016f0203 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -135,15 +135,7 @@ var consumeCmd = &cobra.Command{ } func consumePreRun(cmd *cobra.Command, args []string) { - if protoType == "" { - for _, topic := range cfg.Topics { - if topic.Name == args[0] { - protoType = topic.ProtoType - protoFiles = topic.ProtoPaths - break - } - } - } + loadProtoPathsFromConfig(cmd, args) setupProtoDescriptorRegistry(cmd, args) } diff --git a/cmd/kaf/kaf.go b/cmd/kaf/kaf.go index e435d538..39d8a3e7 100644 --- a/cmd/kaf/kaf.go +++ b/cmd/kaf/kaf.go @@ -173,6 +173,19 @@ func init() { cobra.OnInitialize(onInit) } +var loadProtoPathsFromConfig = func(cmd *cobra.Command, args []string) { + if protoType != "" { + return + } + for _, topic := range cfg.Topics { + if topic.Name == args[0] { + protoType = topic.ProtoType + protoFiles = topic.ProtoPaths + break + } + } +} + var setupProtoDescriptorRegistry = func(cmd *cobra.Command, args []string) { if protoType != "" { r, err := proto.NewDescriptorRegistry(protoFiles, protoExclude) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index 07f230e4..0b9ab119 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -259,14 +259,6 @@ var produceCmd = &cobra.Command{ } func producePreRun(cmd *cobra.Command, args []string) { - if protoType == "" { - for _, topic := range cfg.Topics { - if topic.Name == args[0] { - protoType = topic.ProtoType - protoFiles = topic.ProtoPaths - break - } - } - } + loadProtoPathsFromConfig(cmd, args) setupProtoDescriptorRegistry(cmd, args) } From f5954ce43b53aebcb7198586b4b5c514b8d1a106 Mon Sep 17 00:00:00 2001 From: Kirill Murzin Date: Sat, 4 Oct 2025 13:38:16 +0300 Subject: [PATCH 3/3] added key-proto-type and global-proto-paths --- cmd/kaf/kaf.go | 13 ++++++++----- examples/proto-types.yaml | 15 ++++++--------- pkg/config/config.go | 16 +++++++++------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/cmd/kaf/kaf.go b/cmd/kaf/kaf.go index 39d8a3e7..e6f11a1e 100644 --- a/cmd/kaf/kaf.go +++ b/cmd/kaf/kaf.go @@ -174,13 +174,16 @@ func init() { } var loadProtoPathsFromConfig = func(cmd *cobra.Command, args []string) { - if protoType != "" { - return - } + protoFiles = cfg.GlobalProtoPaths for _, topic := range cfg.Topics { if topic.Name == args[0] { - protoType = topic.ProtoType - protoFiles = topic.ProtoPaths + if protoType == "" { + protoType = topic.ValueProtoType + } + if keyProtoType == "" { + keyProtoType = topic.KeyProtoType + } + protoFiles = append(protoFiles, topic.ProtoPaths...) break } } diff --git a/examples/proto-types.yaml b/examples/proto-types.yaml index f4f9b4e7..c6343fc5 100644 --- a/examples/proto-types.yaml +++ b/examples/proto-types.yaml @@ -2,16 +2,13 @@ clusters: - name: local brokers: - localhost:9092 - SASL: null - TLS: null - security-protocol: "" - version: "1.0.0" +global-proto-paths: + - /path_to_proto_files/global_proto_storage topics: - name: topic1 - proto-type: local.ProtoType1 + value-proto-type: local.ProtoType1 + key-proto-type: local.KeyType1 proto-paths: - - /path_to_proto_files/proto_storage + - /path_to_proto_files/local_proto_storage # appends to global-proto-paths - name: topic2 - proto-type: local.ProtoType2 - proto-paths: - - /path_to_proto_files/proto_storage + value-proto-type: local.ProtoType2 diff --git a/pkg/config/config.go b/pkg/config/config.go index 2c1298b6..0f0d36e7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -45,16 +45,18 @@ type Cluster struct { } type Topic struct { - Name string `yaml:"name"` - ProtoType string `yaml:"proto-type"` - ProtoPaths []string `yaml:"proto-paths"` + Name string `yaml:"name"` + ValueProtoType string `yaml:"value-proto-type"` + KeyProtoType string `yaml:"key-proto-type"` + ProtoPaths []string `yaml:"proto-paths"` } type Config struct { - CurrentCluster string `yaml:"current-cluster"` - ClusterOverride string - Clusters []*Cluster `yaml:"clusters"` - Topics []Topic `yaml:"topics"` + CurrentCluster string `yaml:"current-cluster"` + ClusterOverride string + Clusters []*Cluster `yaml:"clusters"` + GlobalProtoPaths []string `yaml:"global-proto-paths"` + Topics []Topic `yaml:"topics"` } func (c *Config) SetCurrentCluster(name string) error {