Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions cmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cmd

import (
"errors"
"fmt"
"time"

"github.com/Meetic/blackbeard/pkg/resource"
"github.com/gosuri/uiprogress"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -29,6 +31,7 @@ and apply the changes to the Kubernetes namespace.
},
}

//NewApplyCommand returns the apply cobra command
func NewApplyCommand() *cobra.Command {
addCommonNamespaceCommandFlags(applyCmd)
applyCmd.Flags().BoolVar(&wait, "wait", false, "wait until all pods are running")
Expand Down Expand Up @@ -59,19 +62,70 @@ func runApply(namespace string) error {
logrus.WithFields(logrus.Fields{
"namespace": namespace,
}).Info("Waiting for namespace to be ready...")
//init progress bar
uiprogress.Start()
bar := uiprogress.AddBar(100).AppendCompleted().PrependElapsed()

if err := api.WaitForNamespaceReady(namespace, timeout, bar); err != nil {
if err := waitForNamespaceReady(api.Namespaces(), namespace); err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"namespace": namespace,
}).Info("Namespace is ready")

}

return nil
}

func waitForNamespaceReady(ns resource.NamespaceService, namespace string) error {

//Starting watching namespace
go func() {
err := ns.WatchNamespaces()
if err != nil {
logrus.Errorf("Error while trying to watch namespace : %s", err.Error())
}
}()

//register for event listening
ns.AddListener("cli_progress_bar")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to register the listener before starting the watcher to prevent missing event


//initiate progress bar
uiprogress.Start()
bar := uiprogress.AddBar(100).AppendCompleted().PrependElapsed()

//Init timer
timerCh := time.NewTimer(timeout).C
doneCh := make(chan bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to make a buffered chanel make(chan bool, 1) because you will only put one value inside and exist the func after so the goroutine will stop


go func() {
for {
select {
case e := <-ns.Events("cli_progress_bar"):
if e.Type == resource.NamespaceStatusUpdate && e.Namespace == namespace {

logrus.WithFields(logrus.Fields{
"namespace": e.Namespace,
"type": e.Type,
"Phase": e.Phase,
"Status": e.Status,
}).Debug("Event received for namespace")

if err := bar.Set(e.Status); err != nil {
logrus.Debugf("Error when incrementing progress bar : %s", err.Error())
}

if e.Status == 100 {
doneCh <- true
}
}
}
}
}()

for {
select {
case <-timerCh:
return fmt.Errorf("time out : Some pods are not yet ready")
case <-doneCh:
logrus.WithFields(logrus.Fields{
"namespace": namespace,
}).Info("Namespace is ready")
return nil
}
}
}
2 changes: 0 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"strings"
"time"

"github.com/Meetic/blackbeard/pkg/playbook"
"github.com/Meetic/blackbeard/pkg/resource"
Expand All @@ -24,7 +23,6 @@ type Api interface {
Reset(namespace string, configPath string) error
Apply(namespace string, configPath string) error
Update(namespace string, inventory playbook.Inventory, configPath string) error
WaitForNamespaceReady(namespace string, timeout time.Duration, bar progress) error
GetVersion() (*Version, error)
}

Expand Down
39 changes: 0 additions & 39 deletions pkg/api/namespace.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package api

import (
"fmt"
"time"

"github.com/Meetic/blackbeard/pkg/resource"
)

const (
Expand Down Expand Up @@ -52,39 +49,3 @@ func (api *api) ListNamespaces() ([]Namespace, error) {
return namespaces, nil

}

type progress interface {
Set(int) error
}

// WaitForNamespaceReady wait until all pods in the specified namespace are ready.
// And error is returned if the timeout is reach.
func (api *api) WaitForNamespaceReady(namespace string, timeout time.Duration, bar progress) error {

ticker := time.NewTicker(tickerDuration)
timerCh := time.NewTimer(timeout).C
doneCh := make(chan bool)

go func(bar progress, ns resource.NamespaceService, namespace string) {
for range ticker.C {
status, err := ns.GetStatus(namespace)
if err != nil {
ticker.Stop()
}
bar.Set(status.Status)
if status.Status == 100 {
doneCh <- true
}
}
}(bar, api.namespaces, namespace)

for {
select {
case <-timerCh:
ticker.Stop()
return fmt.Errorf("time out : Some pods are not yet ready")
case <-doneCh:
return nil
}
}
}
33 changes: 20 additions & 13 deletions pkg/resource/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
v1 "k8s.io/api/core/v1"
)

const (
refreshStatusTick = 10 * time.Second
NamespaceStatusUpdate = "STATUS.UPDATE"
)

type Namespace struct {
Name string
Phase string
Expand Down Expand Up @@ -39,9 +44,9 @@ type NamespaceRepository interface {
}

type namespaceService struct {
namespaces NamespaceRepository
pods PodRepository
namespaceEvents map[string]chan NamespaceEvent
namespaces NamespaceRepository
pods PodRepository
listeners map[string]chan NamespaceEvent
}

// NamespaceStatus represent namespace with percentage of pods running and status phase (Active or Terminating)
Expand Down Expand Up @@ -74,18 +79,18 @@ func NewNamespaceService(namespaces NamespaceRepository, pods PodRepository) Nam

// AddListener register a new listener
func (ns *namespaceService) AddListener(name string) {
if ns.namespaceEvents == nil {
ns.namespaceEvents = make(map[string]chan NamespaceEvent)
if ns.listeners == nil {
ns.listeners = make(map[string]chan NamespaceEvent)
}

ns.namespaceEvents[name] = make(chan NamespaceEvent)
ns.listeners[name] = make(chan NamespaceEvent)
}

// RemoveListener close channel and remove listener
func (ns *namespaceService) RemoveListener(name string) error {
if listener, ok := ns.namespaceEvents[name]; ok {
if listener, ok := ns.listeners[name]; ok {
close(listener)
delete(ns.namespaceEvents, name)
delete(ns.listeners, name)
return nil
}

Expand All @@ -98,9 +103,11 @@ func (ns *namespaceService) Emit(event NamespaceEvent) {
"component": "emmiter",
"event": event.Type,
"namespace": event.Namespace,
}).Debugf("new status : %d | new phase %s", event.Status, event.Phase)
"status": event.Status,
"phase": event.Phase,
}).Debugf("namespace changed")

for _, ch := range ns.namespaceEvents {
for _, ch := range ns.listeners {
go func(handler chan NamespaceEvent) {
handler <- event
}(ch)
Expand All @@ -125,7 +132,7 @@ func (ns *namespaceService) WatchNamespaces() error {

func (ns *namespaceService) watchStatus() error {

ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(refreshStatusTick)

defer ticker.Stop()

Expand All @@ -146,7 +153,7 @@ func (ns *namespaceService) watchStatus() error {
}

events = append(events, NamespaceEvent{
Type: "STATUS.UPDATE",
Type: NamespaceStatusUpdate,
Namespace: n.Name,
Phase: n.Phase,
Status: n.Status,
Expand Down Expand Up @@ -203,7 +210,7 @@ func (ns *namespaceService) Create(n string) error {

// Events
func (ns *namespaceService) Events(listener string) chan NamespaceEvent {
if ch, ok := ns.namespaceEvents[listener]; ok {
if ch, ok := ns.listeners[listener]; ok {
return ch
}

Expand Down