Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 27 additions & 70 deletions app/register/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ package register
import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -26,6 +23,7 @@ import (

"github.com/NetApp-Polaris/astra-connector-operator/common"
v1 "github.com/NetApp-Polaris/astra-connector-operator/details/operator-sdk/api/v1"
"github.com/NetApp-Polaris/astra-connector-operator/util"
)

const (
Expand Down Expand Up @@ -105,13 +103,13 @@ type ClusterRegisterUtil interface {

type clusterRegisterUtil struct {
AstraConnector *v1.AstraConnector
Client HTTPClient
Client util.HTTPClient
K8sClient client.Client
Ctx context.Context
Log logr.Logger
}

func NewClusterRegisterUtil(astraConnector *v1.AstraConnector, client HTTPClient, k8sClient client.Client, log logr.Logger, ctx context.Context) ClusterRegisterUtil {
func NewClusterRegisterUtil(astraConnector *v1.AstraConnector, client util.HTTPClient, k8sClient client.Client, log logr.Logger, ctx context.Context) ClusterRegisterUtil {
return &clusterRegisterUtil{
AstraConnector: astraConnector,
Client: client,
Expand Down Expand Up @@ -191,7 +189,7 @@ func (c clusterRegisterUtil) UnRegisterNatsSyncClient() error {
return err
}

response, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPost, natsSyncClientUnregisterURL, bytes.NewBuffer(reqBodyBytes), HeaderMap{})
response, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPost, natsSyncClientUnregisterURL, bytes.NewBuffer(reqBodyBytes), util.HeaderMap{})
defer cancel()

if err != nil {
Expand All @@ -218,7 +216,7 @@ func (c clusterRegisterUtil) RegisterNatsSyncClient() (string, error) {
return "", err
}

response, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPost, natsSyncClientRegisterURL, bytes.NewBuffer(reqBodyBytes), HeaderMap{})
response, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPost, natsSyncClientRegisterURL, bytes.NewBuffer(reqBodyBytes), util.HeaderMap{})
defer cancel()
if err != nil {
return "", err
Expand Down Expand Up @@ -257,15 +255,6 @@ func GetAstraHostURL(astraConnector *v1.AstraConnector) string {
return astraHost
}

func (c clusterRegisterUtil) getAstraHostFromURL(astraHostURL string) (string, error) {
cloudBridgeURLSplit := strings.Split(astraHostURL, "://")
if len(cloudBridgeURLSplit) != 2 {
errStr := fmt.Sprintf("invalid cloudBridgeURL provided: %s, format - https://hostname", astraHostURL)
return "", errors.New(errStr)
}
return cloudBridgeURLSplit[1], nil
}

func (c clusterRegisterUtil) logHttpError(response *http.Response) {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
Expand All @@ -287,44 +276,11 @@ func (c clusterRegisterUtil) readResponseBody(response *http.Response) ([]byte,
return bodyBytes, nil
}

func (c clusterRegisterUtil) setHttpClient(disableTls bool, astraHost string) error {
if disableTls {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
c.Log.WithValues("disableTls", disableTls).Info("TLS Validation Disabled! Not for use in production!")
}

if c.AstraConnector.Spec.NatsSyncClient.HostAliasIP != "" {
c.Log.WithValues("HostAliasIP", c.AstraConnector.Spec.NatsSyncClient.HostAliasIP).Info("Using the HostAlias IP")
cloudBridgeHost, err := c.getAstraHostFromURL(astraHost)
if err != nil {
return err
}

dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}

http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
if addr == cloudBridgeHost+":443" {
addr = c.AstraConnector.Spec.NatsSyncClient.HostAliasIP + ":443"
}
if addr == cloudBridgeHost+":80" {
addr = c.AstraConnector.Spec.NatsSyncClient.HostAliasIP + ":80"
}
return dialer.DialContext(ctx, network, addr)
}
}

c.Client = &http.Client{}
return nil
}

func (c clusterRegisterUtil) CloudExists(astraHost, cloudID, apiToken string) bool {
url := fmt.Sprintf("%s/accounts/%s/topology/v1/clouds/%s", astraHost, c.AstraConnector.Spec.Astra.AccountId, cloudID)

headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
defer cancel()

if err != nil {
Expand All @@ -351,8 +307,8 @@ func (c clusterRegisterUtil) ListClouds(astraHost, apiToken string) (*http.Respo
url := fmt.Sprintf("%s/accounts/%s/topology/v1/clouds", astraHost, c.AstraConnector.Spec.Astra.AccountId)

c.Log.Info("Getting clouds")
headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -444,8 +400,8 @@ func (c clusterRegisterUtil) CreateCloud(astraHost, cloudType, apiToken string)
}

c.Log.WithValues("cloudType", cloudType).Info("Creating cloud")
headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPost, url, bytes.NewBuffer(reqBodyBytes), headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPost, url, bytes.NewBuffer(reqBodyBytes), headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -553,8 +509,8 @@ func (c clusterRegisterUtil) GetClusters(astraHost, cloudId, apiToken string) (G

c.Log.Info("Getting Clusters")

headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -600,8 +556,8 @@ func (c clusterRegisterUtil) GetCluster(astraHost, cloudId, clusterId, apiToken
url := fmt.Sprintf("%s/accounts/%s/topology/v1/clouds/%s/clusters/%s", astraHost, c.AstraConnector.Spec.Astra.AccountId, cloudId, clusterId)
var clustersRespJson Cluster

headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -639,8 +595,8 @@ func (c clusterRegisterUtil) CreateCluster(astraHost, cloudId, astraConnectorId,
}

clustersBodyJson, _ := json.Marshal(clustersBody)
headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPost, url, bytes.NewBuffer(clustersBodyJson), headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPost, url, bytes.NewBuffer(clustersBodyJson), headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -696,8 +652,8 @@ func (c clusterRegisterUtil) UpdateCluster(astraHost, cloudId, clusterId, astraC
}

clustersBodyJson, _ := json.Marshal(clustersBody)
headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPut, url, bytes.NewBuffer(clustersBodyJson), headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
clustersResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPut, url, bytes.NewBuffer(clustersBodyJson), headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -755,8 +711,8 @@ func (c clusterRegisterUtil) GetStorageClass(astraHost, cloudId, clusterId, apiT

c.Log.Info("Getting Storage Classes")

headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
storageClassesResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
storageClassesResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodGet, url, nil, headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -818,8 +774,8 @@ func (c clusterRegisterUtil) UpdateManagedCluster(astraHost, clusterId, astraCon
}
manageClustersBodyJson, _ := json.Marshal(manageClustersBody)

headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
manageClustersResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPut, url, bytes.NewBuffer(manageClustersBodyJson), headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
manageClustersResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPut, url, bytes.NewBuffer(manageClustersBodyJson), headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -848,8 +804,8 @@ func (c clusterRegisterUtil) CreateManagedCluster(astraHost, cloudId, clusterID,
}
manageClustersBodyJson, _ := json.Marshal(manageClustersBody)

headerMap := HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
manageClustersResp, err, cancel := DoRequest(c.Ctx, c.Client, http.MethodPost, url, bytes.NewBuffer(manageClustersBodyJson), headerMap)
headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
manageClustersResp, err, cancel := util.DoRequest(c.Ctx, c.Client, http.MethodPost, url, bytes.NewBuffer(manageClustersBodyJson), headerMap)
defer cancel()

if err != nil {
Expand Down Expand Up @@ -996,7 +952,8 @@ func (c clusterRegisterUtil) RegisterClusterWithAstra(astraConnectorId string) e
astraHost := GetAstraHostURL(c.AstraConnector)
c.Log.WithValues("URL", astraHost).Info("Astra Host Info")

err := c.setHttpClient(c.AstraConnector.Spec.Astra.SkipTLSValidation, astraHost)
_, err := util.SetHttpClient(c.AstraConnector.Spec.Astra.SkipTLSValidation,
astraHost, c.AstraConnector.Spec.NatsSyncClient.HostAliasIP, c.Log)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion details/k8s/k8s_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package k8s

import (
"context"

"github.com/go-logr/logr"
"github.com/pkg/errors"

Expand Down
1 change: 1 addition & 0 deletions details/operator-sdk/api/v1/astraconnector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

type Astra struct {
// +kubebuilder:validation:Required
AccountId string `json:"accountId"`
// +kubebuilder:validation:Optional
CloudId string `json:"cloudId"`
Expand Down
105 changes: 104 additions & 1 deletion details/operator-sdk/api/v1/astraconnector_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ package v1

import (
"context"

"fmt"
"github.com/NetApp-Polaris/astra-connector-operator/common"
"github.com/google/uuid"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/kubernetes"
"net"
"net/http"
ctrl "sigs.k8s.io/controller-runtime"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/NetApp-Polaris/astra-connector-operator/util"
Expand All @@ -21,6 +29,11 @@ func (ai *AstraConnector) ValidateCreateAstraConnector() field.ErrorList {
allErrs = append(allErrs, err)
}

if err := ai.ValidateTokenAndAccountID(nil); err != nil {
log.V(3).Info("error while creating AstraConnector Instance", "namespace", ai.Namespace, "err", err)
allErrs = append(allErrs, err)
}

return allErrs
}

Expand All @@ -39,3 +52,93 @@ func (ai *AstraConnector) ValidateNamespace() *field.Error {
}
return nil
}

// ValidateTokenAndAccountID Validates the token and AccoundID provided that AstraConnector should be deployed to.
func (ai *AstraConnector) ValidateTokenAndAccountID(httpClient util.HTTPClient) *field.Error {
cloudBridgeJsonField := util.GetJSONFieldName(&ai.Spec.NatsSyncClient, &ai.Spec.NatsSyncClient.CloudBridgeURL)
tokenRefBridgeJsonField := util.GetJSONFieldName(&ai.Spec.Astra, &ai.Spec.Astra.TokenRef)
accountJsonField := util.GetJSONFieldName(&ai.Spec.Astra, &ai.Spec.Astra.AccountId)
astraHost := getAstraHostURL(ai.Spec.NatsSyncClient.CloudBridgeURL)
accountId := ai.Spec.Astra.AccountId

// Account needs to be a valid UUID
_, err := uuid.Parse(accountId)
if err != nil {
println("Please check account id provided.. Token needs to be UUID")
return field.Invalid(field.NewPath(accountJsonField), ai.Name, "Account not valid")
}

config, _ := ctrl.GetConfig()
clientset, _ := kubernetes.NewForConfig(config)
apiToken, err := getSecret(clientset, ai.Spec.Astra.TokenRef, ai.ObjectMeta.Namespace)
if err != nil {
log.Info("Check TokenRef, make sure Kubernetes secret was created.")
return field.NotFound(field.NewPath(tokenRefBridgeJsonField), ai.Name)
}

if httpClient == nil {
httpClient, err = util.SetHttpClient(ai.Spec.Astra.SkipTLSValidation,
astraHost, ai.Spec.NatsSyncClient.HostAliasIP, log)
if err != nil {
log.Info(fmt.Sprintf("invalid cloudBridgeURL provided: %s, format - https://hostname", ai.Spec.NatsSyncClient.CloudBridgeURL))
return field.Invalid(field.NewPath(cloudBridgeJsonField), ai.Name, "CloudBridgeURL invalid format")
}
}

url := fmt.Sprintf("%s/accounts/%s", astraHost, accountId)

headerMap := util.HeaderMap{Authorization: fmt.Sprintf("Bearer %s", apiToken)}
response, err, cancel := util.DoRequest(context.Background(), httpClient, http.MethodGet, url, nil, headerMap)
defer cancel()

var dnsError *net.DNSError
if errors.As(err, &dnsError) {
log.Info("Please check CloudBridgeURL provided")
return field.Invalid(field.NewPath(cloudBridgeJsonField), ai.Name, "CloudBridgeURL not reachable")
}

// We got a 200 from the GET Account Looks good! no errors
if response.StatusCode == 200 {
return nil
}

// error handling below
if response.StatusCode == 401 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these the only possible error codes? we might want to add a catchall as well.

log.Info("Please check token provided.. 401 Unauthorized response status code from Astra Control")
return field.Invalid(field.NewPath(tokenRefBridgeJsonField), ai.Name, "Unauthorized request with Token Provided")
}

if response.StatusCode == 404 {
println("Please check account id provided.. 404 account not found in Astra Control")
return field.Invalid(field.NewPath(accountJsonField), ai.Name, "Account not found")
}
return nil
}

func getAstraHostURL(cloudBridgeURL string) string {
var astraHost string
if cloudBridgeURL != "" {
astraHost = cloudBridgeURL
} else {
astraHost = common.NatsSyncClientDefaultCloudBridgeURL
}
return astraHost
}

func getSecret(clientset *kubernetes.Clientset, secretName string, namespace string) (string, error) {
secret, err := clientset.CoreV1().Secrets(namespace).Get(context.Background(), secretName, metav1.GetOptions{})
if err != nil {
log.WithValues("namespace", namespace, "secret", secretName).Error(err, "failed to get kubernetes secret")
return "", err
}
// Extract the value of the 'apiToken' key from the secret
apiToken, ok := secret.Data["apiToken"]
if !ok {
log.WithValues("namespace", namespace, "secret", secretName).Error(err, "failed to extract apiToken key from secret")
return "", errors.New("failed to extract apiToken key from secret")
}

// Convert the value to a string
apiTokenStr := string(apiToken)
return apiTokenStr, nil
}
Loading