diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 12cf99bc..d307241c 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -103,7 +103,7 @@ var consumeCmd = &cobra.Command{ Short: "Consume messages", Args: cobra.ExactArgs(1), ValidArgsFunction: validTopicArgs, - PreRun: setupProtoDescriptorRegistry, + PreRun: consumePreRun, Run: func(cmd *cobra.Command, args []string) { var offset int64 cfg := getConfig() @@ -147,6 +147,11 @@ var consumeCmd = &cobra.Command{ }, } +func consumePreRun(cmd *cobra.Command, args []string) { + loadProtoPathsFromConfig(cmd, args) + setupProtoDescriptorRegistry(cmd, args) +} + type g struct{} func (g *g) Setup(s sarama.ConsumerGroupSession) error { diff --git a/cmd/kaf/kaf.go b/cmd/kaf/kaf.go index e435d538..e6f11a1e 100644 --- a/cmd/kaf/kaf.go +++ b/cmd/kaf/kaf.go @@ -173,6 +173,22 @@ func init() { cobra.OnInitialize(onInit) } +var loadProtoPathsFromConfig = func(cmd *cobra.Command, args []string) { + protoFiles = cfg.GlobalProtoPaths + for _, topic := range cfg.Topics { + if topic.Name == args[0] { + if protoType == "" { + protoType = topic.ValueProtoType + } + if keyProtoType == "" { + keyProtoType = topic.KeyProtoType + } + protoFiles = append(protoFiles, topic.ProtoPaths...) + break + } + } +} + var setupProtoDescriptorRegistry = func(cmd *cobra.Command, args []string) { if protoType != "" { r, err := proto.NewDescriptorRegistry(protoFiles, protoExclude) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index 255e6063..48b1a3a7 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -97,7 +97,7 @@ var produceCmd = &cobra.Command{ Short: "Produce record. Reads data from stdin.", Args: cobra.ExactArgs(1), ValidArgsFunction: validTopicArgs, - PreRun: setupProtoDescriptorRegistry, + PreRun: producePreRun, Run: func(cmd *cobra.Command, args []string) { cfg := getConfig() switch partitionerFlag { @@ -319,3 +319,8 @@ func (e *InputFormat) Type() string { func completeInputFormat(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return []string{"default", "json-each-row"}, cobra.ShellCompDirectiveNoFileComp } + +func producePreRun(cmd *cobra.Command, args []string) { + loadProtoPathsFromConfig(cmd, args) + setupProtoDescriptorRegistry(cmd, args) +} diff --git a/examples/proto-types.yaml b/examples/proto-types.yaml new file mode 100644 index 00000000..c6343fc5 --- /dev/null +++ b/examples/proto-types.yaml @@ -0,0 +1,14 @@ +clusters: + - name: local + brokers: + - localhost:9092 +global-proto-paths: + - /path_to_proto_files/global_proto_storage +topics: + - name: topic1 + value-proto-type: local.ProtoType1 + key-proto-type: local.KeyType1 + proto-paths: + - /path_to_proto_files/local_proto_storage # appends to global-proto-paths + - name: topic2 + value-proto-type: local.ProtoType2 diff --git a/pkg/config/config.go b/pkg/config/config.go index 698ee07b..0f0d36e7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -44,10 +44,19 @@ type Cluster struct { SchemaRegistryCredentials *SchemaRegistryCredentials `yaml:"schema-registry-credentials"` } +type Topic struct { + Name string `yaml:"name"` + ValueProtoType string `yaml:"value-proto-type"` + KeyProtoType string `yaml:"key-proto-type"` + ProtoPaths []string `yaml:"proto-paths"` +} + type Config struct { - CurrentCluster string `yaml:"current-cluster"` - ClusterOverride string - Clusters []*Cluster `yaml:"clusters"` + CurrentCluster string `yaml:"current-cluster"` + ClusterOverride string + Clusters []*Cluster `yaml:"clusters"` + GlobalProtoPaths []string `yaml:"global-proto-paths"` + Topics []Topic `yaml:"topics"` } func (c *Config) SetCurrentCluster(name string) error {