diff --git a/core/internal/helpers/sarama.go b/core/internal/helpers/sarama.go index 03084b29..ad303378 100644 --- a/core/internal/helpers/sarama.go +++ b/core/internal/helpers/sarama.go @@ -10,19 +10,30 @@ package helpers import ( + "context" "crypto/tls" "crypto/x509" + "encoding/json" + "fmt" + "net/http" "os" + "strings" "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/oauth2" + oauth2cc "golang.org/x/oauth2/clientcredentials" "github.com/Shopify/sarama" "github.com/spf13/viper" "github.com/stretchr/testify/mock" ) +const ( + defaultTimeoutSeconds = 5 +) + // Since 1.X Kafka has moved to semver, so those have a consistent format. For earlier versions we support formats: // * major.minor.very_minor.patch // * major.minor.patch @@ -80,6 +91,7 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config { saramaConfig.Consumer.Return.Errors = true // Configure TLS if enabled + tlsConfig := &tls.Config{} if viper.IsSet(configRoot + ".tls") { tlsName := viper.GetString(configRoot + ".tls") @@ -97,9 +109,8 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config { } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) - saramaConfig.Net.TLS.Config = &tls.Config{ - RootCAs: caCertPool, - } + tlsConfig.RootCAs = caCertPool + saramaConfig.Net.TLS.Config = tlsConfig if certFile != "" && keyFile != "" { cert, err := tls.LoadX509KeyPair(certFile, keyFile) @@ -134,6 +145,39 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config { saramaConfig.Net.SASL.Password = viper.GetString("sasl." + saslName + ".password") } + // Configure OAuth if enabled + if viper.IsSet(configRoot + ".oauth") { + oauthName := viper.GetString(configRoot + ".oauth") + + saramaConfig.Net.SASL.Enable = true + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + saramaConfig.Net.SASL.Handshake = true + + oauthConfig := &oauth2cc.Config{ + ClientID: viper.GetString("oauth." + oauthName + ".client-id"), + ClientSecret: viper.GetString("oauth." + oauthName + ".client-secret"), + TokenURL: viper.GetString("oauth." + oauthName + ".token-url"), + Scopes: strings.Split(viper.GetString("oauth."+oauthName+".scopes"), " "), + } + + extensionsJSON := viper.GetString("oauth." + oauthName + ".extensions") + extensions := map[string]string{} + if extensionsJSON != "" { + // Defaults to empty map if input is malformed. + _ = json.Unmarshal([]byte(extensionsJSON), &extensions) + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + Timeout: defaultTimeoutSeconds * time.Second, + } + ctx := context.WithValue(context.TODO(), oauth2.HTTPClient, client) + + saramaConfig.Net.SASL.TokenProvider = NewTokenProvider(ctx, oauthConfig, extensions) + } + // Timeout for the initial connection if viper.IsSet(configRoot + ".dial-timeout") { saramaConfig.Net.DialTimeout = time.Duration(viper.GetInt(configRoot+".dial-timeout")) * time.Second @@ -147,6 +191,31 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config { return saramaConfig } +// TokenProvider generates OAuth access tokens for Sarama Kafka clients. +type TokenProvider struct { + config *oauth2cc.Config + extensions map[string]string + tokenSource oauth2.TokenSource +} + +// NewTokenProvider returns a new NewTokenProvider. +func NewTokenProvider(ctx context.Context, config *oauth2cc.Config, extensions map[string]string) *TokenProvider { + return &TokenProvider{ + config: config, + extensions: extensions, + tokenSource: config.TokenSource(ctx), + } +} + +// Token returns a new *sarama.AccessToken or an error as appropriate. +func (t *TokenProvider) Token() (*sarama.AccessToken, error) { + token, err := t.tokenSource.Token() + if err != nil { + return nil, fmt.Errorf("could not fetch OAuth token: %w", err) + } + return &sarama.AccessToken{Token: token.AccessToken, Extensions: t.extensions}, nil +} + // SaramaClient is an internal interface to the sarama.Client. We use our own interface because while sarama.Client is // an interface, sarama.Broker is not. This makes it difficult to test code which uses the Broker objects. This // interface operates in the same way, with the addition of an interface function for creating consumers on the client. diff --git a/core/internal/httpserver/kafka.go b/core/internal/httpserver/kafka.go index e1b84b49..d9029589 100644 --- a/core/internal/httpserver/kafka.go +++ b/core/internal/httpserver/kafka.go @@ -64,6 +64,19 @@ func getSASLProfile(name string) *httpResponseSASLProfile { } } +func getOAUTHProfile(name string) *httpResponseOAUTHProfile { + configRoot := "oauth." + name + if !viper.IsSet(configRoot) { + return nil + } + + return &httpResponseOAUTHProfile{ + Name: name, + HandshakeFirst: viper.GetBool(configRoot + ".handshake-first"), + ClientID: viper.GetString(configRoot + ".client-id"), + } +} + func getClientProfile(name string) httpResponseClientProfile { configRoot := "client-profile." + name return httpResponseClientProfile{ @@ -72,6 +85,7 @@ func getClientProfile(name string) httpResponseClientProfile { KafkaVersion: viper.GetString(configRoot + ".kafka-version"), TLS: getTLSProfile(viper.GetString(configRoot + ".tls")), SASL: getSASLProfile(viper.GetString(configRoot + ".sasl")), + OAUTH: getOAUTHProfile(viper.GetString(configRoot + ".oauth")), } } diff --git a/core/internal/httpserver/structs.go b/core/internal/httpserver/structs.go index b15ce9de..c904e04a 100644 --- a/core/internal/httpserver/structs.go +++ b/core/internal/httpserver/structs.go @@ -47,12 +47,19 @@ type httpResponseSASLProfile struct { Username string `json:"username"` } +type httpResponseOAUTHProfile struct { + Name string `json:"name"` + HandshakeFirst bool `json:"handshake-first"` + ClientID string `json:"client-id"` +} + type httpResponseClientProfile struct { - Name string `json:"name"` - ClientID string `json:"client-id"` - KafkaVersion string `json:"kafka-version"` - TLS *httpResponseTLSProfile `json:"tls"` - SASL *httpResponseSASLProfile `json:"sasl"` + Name string `json:"name"` + ClientID string `json:"client-id"` + KafkaVersion string `json:"kafka-version"` + TLS *httpResponseTLSProfile `json:"tls"` + SASL *httpResponseSASLProfile `json:"sasl"` + OAUTH *httpResponseOAUTHProfile `json:"oauth"` } type httpResponseClusterList struct { diff --git a/go.mod b/go.mod index c83c0b77..677c977a 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/xdg/scram v1.0.5 go.uber.org/zap v1.24.0 + golang.org/x/oauth2 v0.3.0 gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -66,6 +67,7 @@ require ( golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect + google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index a6e475d2..62c3f78e 100644 --- a/go.sum +++ b/go.sum @@ -361,6 +361,8 @@ golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.3.0 h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8= +golang.org/x/oauth2 v0.3.0/go.mod h1:rQrIauxkUhJ6CuwEXwymO2/eh4xz2ZWF1nBkcxS+tGk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -504,6 +506,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=