diff --git a/.gitignore b/.gitignore index bbe7333e369..2f01e3a758b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ cmd/query/query cmd/query/query-linux crossdock/crossdock-linux run-crossdock.log +.vscode/ diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 5079ca8fef2..2222e08402d 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -15,6 +15,9 @@ package producer import ( + "crypto/tls" + "fmt" + "github.com/Shopify/sarama" ) @@ -25,12 +28,108 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string + Brokers []string + Authenticators []Authenticator + Metadata bool + Version string } // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true + var err error + // Last write wins on conflict + for _, auth := range c.Authenticators { + saramaConfig, err = auth.Authenticate(saramaConfig) + if err != nil { + return nil, err + } + } + saramaConfig.Metadata.Full = c.Metadata + if c.Version == "" { + saramaConfig.Version = sarama.MinVersion + } else { + v, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, err + } + saramaConfig.Version = v + } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } + +// AuthConfiguration ... +type AuthConfiguration interface{} + +// SASLAuthConfiguration ... +type SASLAuthConfiguration struct { + Username string + Password string +} + +// TLSAuthConfiguration ... +type TLSAuthConfiguration struct { + Enabled bool + Config *tls.Config +} + +// Authenticator ... +type Authenticator interface { + Authenticate(config *sarama.Config) (*sarama.Config, error) +} + +// SASLAuthenticator ... +type SASLAuthenticator struct { + config *SASLAuthConfiguration +} + +// NewSASLAuthenticator ... +func NewSASLAuthenticator(auth AuthConfiguration) (*SASLAuthenticator, error) { + c, ok := auth.(SASLAuthConfiguration) + if !ok { + return nil, fmt.Errorf("cannot type assert AuthConfiguration into SASLAuthConfiguration") + } + return &SASLAuthenticator{ + config: &c, + }, nil +} + +// Authenticate ... +func (s *SASLAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) { + if s.config == nil || config == nil { + return nil, fmt.Errorf("error") + } + config.Net.SASL.Enable = true + config.Net.SASL.User = s.config.Username + config.Net.SASL.Password = s.config.Password + return config, nil +} + +// TLSAuthenticator ... +type TLSAuthenticator struct { + config *TLSAuthConfiguration +} + +// NewTLSAuthenticator ... +func NewTLSAuthenticator(auth AuthConfiguration) (*TLSAuthenticator, error) { + c, ok := auth.(TLSAuthConfiguration) + if !ok { + return nil, fmt.Errorf("cannot type assert AuthConfiguration into TLSAuthConfiguration") + } + return &TLSAuthenticator{ + config: &c, + }, nil +} + +// Authenticate ... +func (t *TLSAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) { + if t.config == nil || config == nil { + return nil, fmt.Errorf("error") + } + config.Net.TLS.Enable = t.config.Enabled + if t.config.Config != nil { + config.Net.TLS.Config = t.config.Config + } + return config, nil +} diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index e93ea8a75f9..fefedce4aaf 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -25,10 +25,17 @@ import ( ) const ( - configPrefix = "kafka" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" + configPrefix = "kafka" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixEncoding = ".encoding" + suffixSaslEnabled = ".sasl.enabled" + suffixSaslUsername = ".sasl.username" + suffixSaslPassword = ".sasl.password" + suffixMetadata = ".metadata" + suffixTLSEnabled = ".tls.enabled" + //TODO Add required TLS config + suffixVersion = ".version" encodingJSON = "json" encodingProto = "protobuf" @@ -60,12 +67,69 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON), ) + flagSet.Bool( + configPrefix+suffixSaslEnabled, + false, + fmt.Sprintf("Enable SASL configuration"), + ) + flagSet.String( + configPrefix+suffixSaslUsername, + "", + fmt.Sprintf("SASL username"), + ) + flagSet.String( + configPrefix+suffixSaslPassword, + "", + fmt.Sprintf("SASL password"), + ) + flagSet.Bool( + configPrefix+suffixTLSEnabled, + false, + fmt.Sprintf("Enable TLS configuration"), + ) } // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { + auths := make([]producer.Authenticator, 0) + saslEnabled := v.GetBool(configPrefix + suffixSaslEnabled) + if saslEnabled { + authConfig := producer.SASLAuthConfiguration{ + Username: v.GetString(configPrefix + suffixSaslUsername), + Password: v.GetString(configPrefix + suffixSaslPassword), + } + auth, err := producer.NewSASLAuthenticator(authConfig) + if err != nil { + panic(fmt.Sprintf("cannot initialize new SASL authenticator: %+v", err)) + } + auths = append(auths, auth) + } + tlsEnabled := v.GetBool(configPrefix + suffixTLSEnabled) + if tlsEnabled { + //TODO Build full TLS configuration + authConfig := producer.TLSAuthConfiguration{ + Enabled: true, + } + auth, err := producer.NewTLSAuthenticator(authConfig) + if err != nil { + panic(fmt.Sprintf("cannot initialize new TLS authenticator: %+v", err)) + } + auths = append(auths, auth) + } + fullMetadata := true + metadata := v.Get(configPrefix + suffixMetadata) + if metadata != nil { + if m, ok := metadata.(bool); !ok { + panic(fmt.Sprintf("config value %s%s must be a bool (true/false)", configPrefix, suffixMetadata)) + } else { + fullMetadata = m + } + } opt.config = producer.Configuration{ - Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","), + Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","), + Authenticators: auths, + Metadata: fullMetadata, + Version: v.GetString(configPrefix + suffixVersion), } opt.topic = v.GetString(configPrefix + suffixTopic) opt.encoding = v.GetString(configPrefix + suffixEncoding)