Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions core/internal/helpers/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,30 @@
package helpers

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/oauth2"
oauth2cc "golang.org/x/oauth2/clientcredentials"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
"github.com/stretchr/testify/mock"
)

const (
defaultTimeoutSeconds = 5
)

// Since 1.X Kafka has moved to semver, so those have a consistent format. For earlier versions we support formats:
// * major.minor.very_minor.patch
// * major.minor.patch
Expand Down Expand Up @@ -80,6 +91,7 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
saramaConfig.Consumer.Return.Errors = true

// Configure TLS if enabled
tlsConfig := &tls.Config{}
if viper.IsSet(configRoot + ".tls") {
tlsName := viper.GetString(configRoot + ".tls")

Expand All @@ -97,9 +109,8 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
saramaConfig.Net.TLS.Config = &tls.Config{
RootCAs: caCertPool,
}
tlsConfig.RootCAs = caCertPool
saramaConfig.Net.TLS.Config = tlsConfig

if certFile != "" && keyFile != "" {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
Expand Down Expand Up @@ -134,6 +145,39 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
saramaConfig.Net.SASL.Password = viper.GetString("sasl." + saslName + ".password")
}

// Configure OAuth if enabled
if viper.IsSet(configRoot + ".oauth") {
oauthName := viper.GetString(configRoot + ".oauth")

saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.Handshake = true

oauthConfig := &oauth2cc.Config{
ClientID: viper.GetString("oauth." + oauthName + ".client-id"),
ClientSecret: viper.GetString("oauth." + oauthName + ".client-secret"),
TokenURL: viper.GetString("oauth." + oauthName + ".token-url"),
Scopes: strings.Split(viper.GetString("oauth."+oauthName+".scopes"), " "),
}

extensionsJSON := viper.GetString("oauth." + oauthName + ".extensions")
extensions := map[string]string{}
if extensionsJSON != "" {
// Defaults to empty map if input is malformed.
_ = json.Unmarshal([]byte(extensionsJSON), &extensions)
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: defaultTimeoutSeconds * time.Second,
}
ctx := context.WithValue(context.TODO(), oauth2.HTTPClient, client)

saramaConfig.Net.SASL.TokenProvider = NewTokenProvider(ctx, oauthConfig, extensions)
}

// Timeout for the initial connection
if viper.IsSet(configRoot + ".dial-timeout") {
saramaConfig.Net.DialTimeout = time.Duration(viper.GetInt(configRoot+".dial-timeout")) * time.Second
Expand All @@ -147,6 +191,31 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
return saramaConfig
}

// TokenProvider generates OAuth access tokens for Sarama Kafka clients.
type TokenProvider struct {
config *oauth2cc.Config
extensions map[string]string
tokenSource oauth2.TokenSource
}

// NewTokenProvider returns a new NewTokenProvider.
func NewTokenProvider(ctx context.Context, config *oauth2cc.Config, extensions map[string]string) *TokenProvider {
return &TokenProvider{
config: config,
extensions: extensions,
tokenSource: config.TokenSource(ctx),
}
}

// Token returns a new *sarama.AccessToken or an error as appropriate.
func (t *TokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
if err != nil {
return nil, fmt.Errorf("could not fetch OAuth token: %w", err)
}
return &sarama.AccessToken{Token: token.AccessToken, Extensions: t.extensions}, nil
}

// SaramaClient is an internal interface to the sarama.Client. We use our own interface because while sarama.Client is
// an interface, sarama.Broker is not. This makes it difficult to test code which uses the Broker objects. This
// interface operates in the same way, with the addition of an interface function for creating consumers on the client.
Expand Down
14 changes: 14 additions & 0 deletions core/internal/httpserver/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ func getSASLProfile(name string) *httpResponseSASLProfile {
}
}

func getOAUTHProfile(name string) *httpResponseOAUTHProfile {
configRoot := "oauth." + name
if !viper.IsSet(configRoot) {
return nil
}

return &httpResponseOAUTHProfile{
Name: name,
HandshakeFirst: viper.GetBool(configRoot + ".handshake-first"),
ClientID: viper.GetString(configRoot + ".client-id"),
}
}

func getClientProfile(name string) httpResponseClientProfile {
configRoot := "client-profile." + name
return httpResponseClientProfile{
Expand All @@ -72,6 +85,7 @@ func getClientProfile(name string) httpResponseClientProfile {
KafkaVersion: viper.GetString(configRoot + ".kafka-version"),
TLS: getTLSProfile(viper.GetString(configRoot + ".tls")),
SASL: getSASLProfile(viper.GetString(configRoot + ".sasl")),
OAUTH: getOAUTHProfile(viper.GetString(configRoot + ".oauth")),
}
}

Expand Down
17 changes: 12 additions & 5 deletions core/internal/httpserver/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ type httpResponseSASLProfile struct {
Username string `json:"username"`
}

type httpResponseOAUTHProfile struct {
Name string `json:"name"`
HandshakeFirst bool `json:"handshake-first"`
ClientID string `json:"client-id"`
}

type httpResponseClientProfile struct {
Name string `json:"name"`
ClientID string `json:"client-id"`
KafkaVersion string `json:"kafka-version"`
TLS *httpResponseTLSProfile `json:"tls"`
SASL *httpResponseSASLProfile `json:"sasl"`
Name string `json:"name"`
ClientID string `json:"client-id"`
KafkaVersion string `json:"kafka-version"`
TLS *httpResponseTLSProfile `json:"tls"`
SASL *httpResponseSASLProfile `json:"sasl"`
OAUTH *httpResponseOAUTHProfile `json:"oauth"`
}

type httpResponseClusterList struct {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/xdg/scram v1.0.5
go.uber.org/zap v1.24.0
golang.org/x/oauth2 v0.3.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.3.0 h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8=
golang.org/x/oauth2 v0.3.0/go.mod h1:rQrIauxkUhJ6CuwEXwymO2/eh4xz2ZWF1nBkcxS+tGk=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -504,6 +506,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
Expand Down