Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func CheckPods(cs kubernetes.Interface, namespace string, target RolloutTarget,
// glog.V(2).Infof("Still waiting for rollout: pod %s phase is %v", pod.Name, pod.Status.Phase)
// return false, nil
//}
glog.V(4).Infof("Check pod spec version %v, $v", pod.Name, pod.Namespace)
glog.V(4).Infof("Check pod spec version %v, %v", pod.Name, pod.Namespace)

ok, err := workload.CheckPodSpecVersion(pod.Spec, kcd, version)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions events/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ func patchForContainer(cName string, current, replacement Record, stats stats.St
glog.Errorf("Failed to build registry for image repo: %v", imageRepo)
return nil, false
} else {
versions, err := registry.Versions(context.Background(), fluxTag)
versions, digest, err := registry.Versions(context.Background(), fluxTag)
if err != nil {
glog.Errorf("Syncer failed to get version from registry using tag=%s", fluxTag)
return nil , false
}
version := versions[0]
glog.Infof("Got registry versions for container=%s, tag=%s, rolloutVersion=%s", cName, fluxTag, version)
glog.Infof("Got registry versions for container=%s, tag=%s, rolloutVersion=%s, imageDigest=%s", cName, fluxTag, version, *digest)

patchOp.Value = imageDataFlux[0] + ":" + version
glog.Infof("Replacing path=%v old tag=%v to patched version=%v", pathToPatch, fluxTag, version)
Expand Down
6 changes: 3 additions & 3 deletions registry/dockerhub/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func (vp *V2Provider) RegistryFor(imageRepo string) (kcdregistry.Registry, error
}

// Versions implements the Registry interface.
func (vp *V2Provider) Versions(ctx context.Context, tag string) ([]string, error) {
func (vp *V2Provider) Versions(ctx context.Context, tag string) (kcdregistry.Versions, kcdregistry.Digest, error) {
tags := make([]string, 0, 5)
newVersion, err := vp.getDigest(tag)
if err != nil {
vp.opts.Stats.IncCount("registry.failure", vp.repository)
return tags, errors.Errorf("No version found for tag %s", tag)
return tags, &newVersion, errors.Errorf("No version found for tag %s", tag)
}
tags = append(tags, newVersion)
return tags, nil
return tags, &newVersion, nil
}

// Add adds list of tags to the image identified with version
Expand Down
11 changes: 6 additions & 5 deletions registry/ecr/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (ep *Provider) RegistryFor(imageRepo string) (registry.Registry, error) {
}

// Version implements the Registry interface.
func (ep *Provider) Versions(ctx context.Context, tag string) ([]string, error) {
func (ep *Provider) Versions(ctx context.Context, tag string) (registry.Versions, registry.Digest, error) {
// TODO: parameterize timeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*15)
Expand All @@ -118,26 +118,27 @@ func (ep *Provider) Versions(ctx context.Context, tag string) ([]string, error)
result, err := ep.ecr.DescribeImagesWithContext(ctx, req)
if err != nil {
glog.Errorf("Failed to get ECR: %v", err)
return nil, errors.Wrap(err, "failed to get ecr")
return nil, nil, errors.Wrap(err, "failed to get ecr")
}
if len(result.ImageDetails) != 1 {
ep.stats.Event(fmt.Sprintf("registry.%s.sync.failure", ep.repoName),
fmt.Sprintf("Failed to sync with ECR for tag %s", tag), "", "error",
time.Now().UTC(), tag)
return nil, errors.Errorf("Bad state: More than one image was tagged with %s", tag)
return nil, nil, errors.Errorf("Bad state: More than one image was tagged with %s", tag)
}

img := result.ImageDetails[0]

versions := ep.currentVersions(img)
digest := img.ImageDigest
if len(versions) == 0 {
ep.stats.IncCount("registry.failure", ep.repoName)
return nil, errors.Errorf("No version found for tag %s", tag)
return nil, nil, errors.Errorf("No version found for tag %s", tag)
}

glog.V(2).Infof("Got currentVersions=%s from ECR", strings.Join(versions, ", "))

return versions, nil
return versions, digest, nil
}

// Add a list of tags to the image identified with version
Expand Down
5 changes: 4 additions & 1 deletion registry/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"strings"
)

type Versions []string
type Digest *string

// ProviderByRepo generates Type based on image ARN
func ProviderByRepo(repoARN string) string {
if strings.Contains(repoARN, "amazonaws.com") {
Expand All @@ -24,7 +27,7 @@ type Registry interface {
// on each commit and a commit that does not change the resulting image is made
// and is tagged. In this case, the syncers check all the tags for the existing
// version before determining if a rollout should occur.
Versions(ctx context.Context, tag string) ([]string, error)
Versions(ctx context.Context, tag string) (Versions, Digest, error)
}

// Tagger provides capability of adding/removing environment tags on ECR
Expand Down
73 changes: 73 additions & 0 deletions resource/robbieClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package resource

import (
"bytes"
"encoding/json"
"github.com/golang/glog"
"github.com/wish/kcd/registry"
"net/http"
"time"
)

const RobbieEndpoint = "https://robbie-dev.i.wish.com/signoff"

type robbieSignOffRequest struct {
KcdName string
KcdNameSpace string
KcdLables map[string]string
KcdTag string
KcdImageRepo string
Versions registry.Versions
Digest registry.Digest
}

type signOffReview struct {
Result bool `json:"result"`
Uuid string `json:"uuid"`
}

func signOffPost(signoffReq *robbieSignOffRequest, endpoint string) (*signOffReview, error) {
requestBody, err := json.Marshal(signoffReq)
if err != nil {
return nil, err
}
r, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(requestBody))
r.Header.Add("Content-Type", "application/json")

client := &http.Client{}
res, err := client.Do(r)
if err != nil {
return nil, err
}

defer res.Body.Close()

review := &signOffReview{}
derr := json.NewDecoder(res.Body).Decode(review)
if derr != nil {
return nil, derr
}
return review, nil

}

func signOffRetryalbe(signoffReq *robbieSignOffRequest, endpoint string) (bool, error) {
// max attempts to Robbie Sign off is set as 3, and init sleep duration is 3 seconds
attempts := 3
sleep := 3
var result *signOffReview
var err error
for i := 0; i < attempts; i++ {
glog.V(2).Infof("Querying with Robbie to get sign-off review for the %v attempt", i)
result, err = signOffPost(signoffReq, endpoint)
if err == nil {
glog.V(2).Infof("Successfully get with Robbie sign-off review as %v, with UUId as %v", result.Result, result.Uuid)
return result.Result, nil
} else {
glog.V(2).Infof("Querying with Robbie to get sign-off review fails, sleep for %v second to retry", sleep)
time.Sleep(time.Duration(sleep) * time.Second)
sleep *= 2
}
}
return false, err
}
103 changes: 103 additions & 0 deletions resource/robbieClient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package resource

import (
"encoding/json"
"github.com/coreos/etcd/pkg/testutil"
"io"
"net/http"
"net/http/httptest"
"testing"
)

func Test_signOffPost(t *testing.T) {
var digest = "digest123"
var signOffReq = robbieSignOffRequest{
KcdName: "my_kcd",
KcdNameSpace: "my_namespace",
KcdLables: map[string]string{
"a": "a1",
"b": "b1",
},
KcdTag: "my_tag",
KcdImageRepo: "my_repo",
Versions: []string{"123", "456", "789"},
Digest: &digest,
}

var server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}

signOffReq := robbieSignOffRequest{}
_ = json.Unmarshal(b, &signOffReq)

testutil.AssertEqual(t, "my_kcd", signOffReq.KcdName)
testutil.AssertEqual(t, "my_namespace", signOffReq.KcdNameSpace)
testutil.AssertEqual(t, "my_tag", signOffReq.KcdTag)
testutil.AssertEqual(t, "my_repo", signOffReq.KcdImageRepo)
testutil.AssertEqual(t, "digest123", *(signOffReq.Digest))
testutil.AssertEqual(t, 2, len(signOffReq.KcdLables))
testutil.AssertEqual(t, 3, len(signOffReq.Versions))

var review = signOffReview{
Result: true,
Uuid: signOffReq.KcdName + "-" + signOffReq.KcdImageRepo,
}
var responseBody, _ = json.Marshal(&review)
io.WriteString(w, string(responseBody))
// mock here
}))

var review, err = signOffPost(&signOffReq, server.URL)
testutil.AssertNil(t, err)
testutil.AssertEqual(t, "my_kcd-my_repo", review.Uuid)
testutil.AssertTrue(t, review.Result)
}

func Test_signOffPostRetryable_Good(t *testing.T) {
var digest = "digest123"
var signOffReq = robbieSignOffRequest{
KcdName: "my_kcd",
KcdNameSpace: "my_namespace",
KcdLables: map[string]string{
"a": "a1",
"b": "b1",
},
KcdTag: "my_tag",
KcdImageRepo: "my_repo",
Versions: []string{"123", "456", "789"},
Digest: &digest,
}

var server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}

signOffReq := robbieSignOffRequest{}
_ = json.Unmarshal(b, &signOffReq)

testutil.AssertEqual(t, "my_kcd", signOffReq.KcdName)
testutil.AssertEqual(t, "my_namespace", signOffReq.KcdNameSpace)
testutil.AssertEqual(t, "my_tag", signOffReq.KcdTag)
testutil.AssertEqual(t, "my_repo", signOffReq.KcdImageRepo)
testutil.AssertEqual(t, "digest123", *(signOffReq.Digest))
testutil.AssertEqual(t, 2, len(signOffReq.KcdLables))
testutil.AssertEqual(t, 3, len(signOffReq.Versions))

var review = signOffReview{
Result: true,
Uuid: signOffReq.KcdName + "-" + signOffReq.KcdImageRepo,
}
var responseBody, _ = json.Marshal(&review)
io.WriteString(w, string(responseBody))
// mock here
}))

var res, err = signOffRetryalbe(&signOffReq, server.URL)
testutil.AssertNil(t, err)
testutil.AssertTrue(t, res)
}
28 changes: 22 additions & 6 deletions resource/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/wish/kcd/config"
"github.com/wish/kcd/deploy"
"github.com/wish/kcd/events"
Expand All @@ -15,8 +17,6 @@ import (
"github.com/wish/kcd/registry"
"github.com/wish/kcd/state"
"github.com/wish/kcd/verify"
"github.com/golang/glog"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -98,8 +98,9 @@ func (s *Syncer) initialState() state.StateFunc {

// refresh kcd resource state
s.kcd = kcd

versions, err := s.registry.Versions(ctx, s.kcd.Spec.Tag)

// get digest from ECR Describe Image in order to have more strict image version validation with Robbie
versions, digest, err := s.registry.Versions(ctx, s.kcd.Spec.Tag)
if err != nil {
glog.Errorf("Syncer failed to get version from registry, kcd=%s, tag=%s: %v", s.kcd.Name, kcd.Spec.Tag, err)
s.options.Recorder.Event(events.Warning, "KCDSyncFailed", "Failed to get versions from registry")
Expand All @@ -108,7 +109,7 @@ func (s *Syncer) initialState() state.StateFunc {

version := versions[0]
if glog.V(4) {
glog.V(4).Infof("Got registry versions for kcd=%s, tag=%s, versions=%v, rolloutVersion=%s", s.kcd.Name, kcd.Spec.Tag, strings.Join(versions, ", "), version)
glog.V(4).Infof("Got registry versions for kcd=%s, tag=%s, versions=%v, rolloutVersion=%s, imageDigest=%s", s.kcd.Name, kcd.Spec.Tag, strings.Join(versions, ", "), version, *digest)
}

deployer, err := deploy.New(s.workloadProvider, s.registryProvider, s.kcd, versions[0])
Expand All @@ -128,7 +129,21 @@ func (s *Syncer) initialState() state.StateFunc {
return state.None()
}

glog.V(4).Infof("Creating rollout state for kcd=%s", s.kcd.Name)
process, err = signOffRetryalbe(&robbieSignOffRequest{
KcdName: s.kcd.GetName(),
KcdNameSpace: s.kcd.GetNamespace(),
KcdLables: s.kcd.GetLabels(),
KcdTag: s.kcd.Spec.Tag,
KcdImageRepo: s.kcd.Spec.ImageRepo,
Versions: versions,
Digest: digest,
}, RobbieEndpoint)

if err != nil || !process {
glog.V(4).Infof("Fail to acquire sign-off by Robbie, so not attempting %s rollout of version %s: %+v", s.kcd.Name, version, s.kcd.Status)
return state.None()
}
glog.V(4).Infof("Successfully acquire sign-off by Robbie and creating rollout state for kcd=%s", s.kcd.Name)

syncState := s.verify(version,
s.updateRolloutStatus(version, StatusProgressing,
Expand Down Expand Up @@ -371,3 +386,4 @@ func (s *Syncer) addHistory(deployer deploy.Deployer, version string, next state
return state.Single(next)
}
}

2 changes: 1 addition & 1 deletion verify/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (iv *ImageVerifier) getImage(ctx context.Context, spec kcd1.VerifySpec) (st
return "", errors.Wrapf(err, "failed to get registry for %s", spec.Image)
}

versions, err := registry.Versions(ctx, spec.Tag)
versions, _, err := registry.Versions(ctx, spec.Tag)
if err != nil {
return "", errors.Wrapf(err, "failed to get version from registry for %+v", spec)
}
Expand Down