diff --git a/cmd/apply.go b/cmd/apply.go index 5cda2c0..3d5e245 100644 --- a/cmd/apply.go +++ b/cmd/apply.go @@ -2,8 +2,10 @@ package cmd import ( "errors" + "fmt" "time" + "github.com/Meetic/blackbeard/pkg/resource" "github.com/gosuri/uiprogress" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -29,6 +31,7 @@ and apply the changes to the Kubernetes namespace. }, } +//NewApplyCommand returns the apply cobra command func NewApplyCommand() *cobra.Command { addCommonNamespaceCommandFlags(applyCmd) applyCmd.Flags().BoolVar(&wait, "wait", false, "wait until all pods are running") @@ -59,19 +62,70 @@ func runApply(namespace string) error { logrus.WithFields(logrus.Fields{ "namespace": namespace, }).Info("Waiting for namespace to be ready...") - //init progress bar - uiprogress.Start() - bar := uiprogress.AddBar(100).AppendCompleted().PrependElapsed() - if err := api.WaitForNamespaceReady(namespace, timeout, bar); err != nil { + if err := waitForNamespaceReady(api.Namespaces(), namespace); err != nil { return err } - - logrus.WithFields(logrus.Fields{ - "namespace": namespace, - }).Info("Namespace is ready") - } return nil } + +func waitForNamespaceReady(ns resource.NamespaceService, namespace string) error { + + //Starting watching namespace + go func() { + err := ns.WatchNamespaces() + if err != nil { + logrus.Errorf("Error while trying to watch namespace : %s", err.Error()) + } + }() + + //register for event listening + ns.AddListener("cli_progress_bar") + + //initiate progress bar + uiprogress.Start() + bar := uiprogress.AddBar(100).AppendCompleted().PrependElapsed() + + //Init timer + timerCh := time.NewTimer(timeout).C + doneCh := make(chan bool) + + go func() { + for { + select { + case e := <-ns.Events("cli_progress_bar"): + if e.Type == resource.NamespaceStatusUpdate && e.Namespace == namespace { + + logrus.WithFields(logrus.Fields{ + "namespace": e.Namespace, + "type": e.Type, + "Phase": e.Phase, + "Status": e.Status, + }).Debug("Event received for namespace") + + if err := bar.Set(e.Status); err != nil { + logrus.Debugf("Error when incrementing progress bar : %s", err.Error()) + } + + if e.Status == 100 { + doneCh <- true + } + } + } + } + }() + + for { + select { + case <-timerCh: + return fmt.Errorf("time out : Some pods are not yet ready") + case <-doneCh: + logrus.WithFields(logrus.Fields{ + "namespace": namespace, + }).Info("Namespace is ready") + return nil + } + } +} diff --git a/pkg/api/api.go b/pkg/api/api.go index d1e3faa..720363c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -2,7 +2,6 @@ package api import ( "strings" - "time" "github.com/Meetic/blackbeard/pkg/playbook" "github.com/Meetic/blackbeard/pkg/resource" @@ -24,7 +23,6 @@ type Api interface { Reset(namespace string, configPath string) error Apply(namespace string, configPath string) error Update(namespace string, inventory playbook.Inventory, configPath string) error - WaitForNamespaceReady(namespace string, timeout time.Duration, bar progress) error GetVersion() (*Version, error) } diff --git a/pkg/api/namespace.go b/pkg/api/namespace.go index 75a38cb..523eff8 100644 --- a/pkg/api/namespace.go +++ b/pkg/api/namespace.go @@ -1,10 +1,7 @@ package api import ( - "fmt" "time" - - "github.com/Meetic/blackbeard/pkg/resource" ) const ( @@ -52,39 +49,3 @@ func (api *api) ListNamespaces() ([]Namespace, error) { return namespaces, nil } - -type progress interface { - Set(int) error -} - -// WaitForNamespaceReady wait until all pods in the specified namespace are ready. -// And error is returned if the timeout is reach. -func (api *api) WaitForNamespaceReady(namespace string, timeout time.Duration, bar progress) error { - - ticker := time.NewTicker(tickerDuration) - timerCh := time.NewTimer(timeout).C - doneCh := make(chan bool) - - go func(bar progress, ns resource.NamespaceService, namespace string) { - for range ticker.C { - status, err := ns.GetStatus(namespace) - if err != nil { - ticker.Stop() - } - bar.Set(status.Status) - if status.Status == 100 { - doneCh <- true - } - } - }(bar, api.namespaces, namespace) - - for { - select { - case <-timerCh: - ticker.Stop() - return fmt.Errorf("time out : Some pods are not yet ready") - case <-doneCh: - return nil - } - } -} diff --git a/pkg/resource/namespace.go b/pkg/resource/namespace.go index c2a66f6..1bf7061 100644 --- a/pkg/resource/namespace.go +++ b/pkg/resource/namespace.go @@ -8,6 +8,11 @@ import ( v1 "k8s.io/api/core/v1" ) +const ( + refreshStatusTick = 10 * time.Second + NamespaceStatusUpdate = "STATUS.UPDATE" +) + type Namespace struct { Name string Phase string @@ -39,9 +44,9 @@ type NamespaceRepository interface { } type namespaceService struct { - namespaces NamespaceRepository - pods PodRepository - namespaceEvents map[string]chan NamespaceEvent + namespaces NamespaceRepository + pods PodRepository + listeners map[string]chan NamespaceEvent } // NamespaceStatus represent namespace with percentage of pods running and status phase (Active or Terminating) @@ -74,18 +79,18 @@ func NewNamespaceService(namespaces NamespaceRepository, pods PodRepository) Nam // AddListener register a new listener func (ns *namespaceService) AddListener(name string) { - if ns.namespaceEvents == nil { - ns.namespaceEvents = make(map[string]chan NamespaceEvent) + if ns.listeners == nil { + ns.listeners = make(map[string]chan NamespaceEvent) } - ns.namespaceEvents[name] = make(chan NamespaceEvent) + ns.listeners[name] = make(chan NamespaceEvent) } // RemoveListener close channel and remove listener func (ns *namespaceService) RemoveListener(name string) error { - if listener, ok := ns.namespaceEvents[name]; ok { + if listener, ok := ns.listeners[name]; ok { close(listener) - delete(ns.namespaceEvents, name) + delete(ns.listeners, name) return nil } @@ -98,9 +103,11 @@ func (ns *namespaceService) Emit(event NamespaceEvent) { "component": "emmiter", "event": event.Type, "namespace": event.Namespace, - }).Debugf("new status : %d | new phase %s", event.Status, event.Phase) + "status": event.Status, + "phase": event.Phase, + }).Debugf("namespace changed") - for _, ch := range ns.namespaceEvents { + for _, ch := range ns.listeners { go func(handler chan NamespaceEvent) { handler <- event }(ch) @@ -125,7 +132,7 @@ func (ns *namespaceService) WatchNamespaces() error { func (ns *namespaceService) watchStatus() error { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(refreshStatusTick) defer ticker.Stop() @@ -146,7 +153,7 @@ func (ns *namespaceService) watchStatus() error { } events = append(events, NamespaceEvent{ - Type: "STATUS.UPDATE", + Type: NamespaceStatusUpdate, Namespace: n.Name, Phase: n.Phase, Status: n.Status, @@ -203,7 +210,7 @@ func (ns *namespaceService) Create(n string) error { // Events func (ns *namespaceService) Events(listener string) chan NamespaceEvent { - if ch, ok := ns.namespaceEvents[listener]; ok { + if ch, ok := ns.listeners[listener]; ok { return ch }