Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var consumeCmd = &cobra.Command{
Short: "Consume messages",
Args: cobra.ExactArgs(1),
ValidArgsFunction: validTopicArgs,
PreRun: setupProtoDescriptorRegistry,
PreRun: consumePreRun,
Run: func(cmd *cobra.Command, args []string) {
var offset int64
cfg := getConfig()
Expand Down Expand Up @@ -147,6 +147,11 @@ var consumeCmd = &cobra.Command{
},
}

func consumePreRun(cmd *cobra.Command, args []string) {
loadProtoPathsFromConfig(cmd, args)
setupProtoDescriptorRegistry(cmd, args)
}

type g struct{}

func (g *g) Setup(s sarama.ConsumerGroupSession) error {
Expand Down
16 changes: 16 additions & 0 deletions cmd/kaf/kaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ func init() {
cobra.OnInitialize(onInit)
}

var loadProtoPathsFromConfig = func(cmd *cobra.Command, args []string) {
protoFiles = cfg.GlobalProtoPaths
for _, topic := range cfg.Topics {
if topic.Name == args[0] {
if protoType == "" {
protoType = topic.ValueProtoType
}
if keyProtoType == "" {
keyProtoType = topic.KeyProtoType
}
protoFiles = append(protoFiles, topic.ProtoPaths...)
break
}
}
}

var setupProtoDescriptorRegistry = func(cmd *cobra.Command, args []string) {
if protoType != "" {
r, err := proto.NewDescriptorRegistry(protoFiles, protoExclude)
Expand Down
7 changes: 6 additions & 1 deletion cmd/kaf/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var produceCmd = &cobra.Command{
Short: "Produce record. Reads data from stdin.",
Args: cobra.ExactArgs(1),
ValidArgsFunction: validTopicArgs,
PreRun: setupProtoDescriptorRegistry,
PreRun: producePreRun,
Run: func(cmd *cobra.Command, args []string) {
cfg := getConfig()
switch partitionerFlag {
Expand Down Expand Up @@ -319,3 +319,8 @@ func (e *InputFormat) Type() string {
func completeInputFormat(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{"default", "json-each-row"}, cobra.ShellCompDirectiveNoFileComp
}

func producePreRun(cmd *cobra.Command, args []string) {
loadProtoPathsFromConfig(cmd, args)
setupProtoDescriptorRegistry(cmd, args)
}
14 changes: 14 additions & 0 deletions examples/proto-types.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
clusters:
- name: local
brokers:
- localhost:9092
global-proto-paths:
- /path_to_proto_files/global_proto_storage
topics:
- name: topic1
value-proto-type: local.ProtoType1
key-proto-type: local.KeyType1
proto-paths:
- /path_to_proto_files/local_proto_storage # appends to global-proto-paths
- name: topic2
value-proto-type: local.ProtoType2
15 changes: 12 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,19 @@ type Cluster struct {
SchemaRegistryCredentials *SchemaRegistryCredentials `yaml:"schema-registry-credentials"`
}

type Topic struct {
Name string `yaml:"name"`
ValueProtoType string `yaml:"value-proto-type"`
KeyProtoType string `yaml:"key-proto-type"`
ProtoPaths []string `yaml:"proto-paths"`
}

type Config struct {
CurrentCluster string `yaml:"current-cluster"`
ClusterOverride string
Clusters []*Cluster `yaml:"clusters"`
CurrentCluster string `yaml:"current-cluster"`
ClusterOverride string
Clusters []*Cluster `yaml:"clusters"`
GlobalProtoPaths []string `yaml:"global-proto-paths"`
Topics []Topic `yaml:"topics"`
}

func (c *Config) SetCurrentCluster(name string) error {
Expand Down