From a4a3b5f467b63fa3a3842dfea08d9d795a659062 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Fri, 22 Jul 2022 12:08:20 -0400 Subject: [PATCH 01/13] Moving to systemd events to detect changes in service status --- internal/metrics/system.go | 2 +- internal/metrics/workload.go | 2 +- internal/service/event_listener.go | 116 ++++++++++++++++++++++++ internal/service/systemd.go | 56 ++++++++---- internal/workload/manager.go | 2 +- internal/workload/podman/mock_podman.go | 12 +-- internal/workload/podman/podman.go | 95 +++---------------- internal/workload/wrapper.go | 62 ++++++------- internal/workload/wrapper_test.go | 1 - 9 files changed, 201 insertions(+), 147 deletions(-) create mode 100644 internal/service/event_listener.go diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 5223db1e..01cabd37 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false) + nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false, nil) if err != nil { return nil, err } diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index fc213b8d..af109e8f 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -48,7 +48,7 @@ func (wrkM *WorkloadMetrics) Update(config models.DeviceConfigurationMessage) er } func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) { - log.Infof("removing target metrics for workload '%v'", workloadName) + log.Infof("Removing target metrics for workload '%v'", workloadName) wrkM.daemon.DeleteTarget(workloadName) } diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go new file mode 100644 index 00000000..cfbadec6 --- /dev/null +++ b/internal/service/event_listener.go @@ -0,0 +1,116 @@ +package service + +import ( + "fmt" + + "github.com/coreos/go-systemd/v22/dbus" + log "github.com/sirupsen/logrus" +) + +const ( + // servicePostfixLength is the length of the string ".service". This value is used to extract the workload name from the service + // coming from systemd + servicePostfixLength = 8 + + EventStarted EventType = "started" + EventStopped EventType = "stopped" + + // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent + // The only addition to the list is the "removed" case, which we use when a service is removed + // from the system and we receive an empty unitStatus object. + + //The service is running + running unitSubState = "running" + //The service is starting, probably due to a restart. + //In this case we have to notify the observers that the service is not yet up and is loading. A subsequent 'running' event will proceed. + start unitSubState = "start" + //The service has been stopped, due to the container being killed. Systemd will restart the service. + stop unitSubState = "stop" + //The service is dead, due to the container being stopped for instance. + dead unitSubState = "dead" + //The service failed to start + failed unitSubState = "failed" + //The service has been removed, no unit state is returned from systemd + removed unitSubState = "removed" +) + +type unitSubState string + +type EventType string + +type Event struct { + WorkloadName string + Type EventType +} + +type EventListener struct { + observerCh chan *Event + set *dbus.SubscriptionSet + dbusCh <-chan map[string]*dbus.UnitStatus + dbusErrCh <-chan error +} + +func NewEventListener(observerCh chan *Event) *EventListener { + return &EventListener{observerCh: observerCh} +} + +func (e *EventListener) Connect() error { + conn, err := newDbusConnection(true) + if err != nil { + return err + } + e.set = conn.NewSubscriptionSet() + return nil +} + +func (e *EventListener) Listen() { + e.dbusCh, e.dbusErrCh = e.set.Subscribe() + go func() { + for { + select { + case msg := <-e.dbusCh: + for name, unit := range msg { + log.Debugf("Captured event for %s: %v+", name, unit) + n := extractWorkloadName(name) + state := translateUnitSubStatus(unit) + log.Infof("Service %s for workload transitioned to sub state %s\n", n, state) + switch state { + case running: + e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} + case removed, stop, dead, failed, start: + e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} + default: + log.Infof("Ignoring unit sub state for service %s: %s", name, unit.SubState) + } + } + case msg := <-e.dbusErrCh: + log.Errorf("Error while parsing dbus event: %v", msg) + } + } + }() +} + +func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { + if unit == nil { + return removed + } + return unitSubState(unit.SubState) +} + +func (e *EventListener) Add(workloadName string) { + name := fmt.Sprintf("%s.service", workloadName) + log.Debugf("Adding service for events %s", name) + if !e.set.Contains(name) { + e.set.Add(name) + } +} + +func (e *EventListener) Remove(workloadName string) { + name := fmt.Sprintf("%s.service", workloadName) + log.Debugf("Removing service for events %s", name) + e.set.Remove(name) +} + +func extractWorkloadName(serviceName string) string { + return serviceName[:len(serviceName)-servicePostfixLength] +} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index e73195f3..500f93e3 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "os" "path" "path/filepath" @@ -19,6 +20,7 @@ const ( DefaultRestartTimeout = 15 TimerSuffix = ".timer" ServiceSuffix = ".service" + DefaultNameSeparator = "-" ) var ( @@ -43,6 +45,7 @@ type systemd struct { UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` Rootless bool `json:"rootless"` + eventCh chan Event `json:"-"` } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -54,12 +57,13 @@ type SystemdManager interface { } type systemdManager struct { - svcFilePath string - lock sync.RWMutex - services map[string]Service + svcFilePath string + lock sync.RWMutex + services map[string]Service + eventListener *EventListener } -func NewSystemdManager(configDir string) (SystemdManager, error) { +func NewSystemdManager(configDir string, observerCh chan *Event) (SystemdManager, error) { services := make(map[string]*systemd) servicePath := path.Join(configDir, "services.json") servicesJson, err := os.ReadFile(servicePath) //#nosec @@ -74,8 +78,16 @@ func NewSystemdManager(configDir string) (SystemdManager, error) { for k, v := range services { systemdSVC[k] = v } - - return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}}, nil + listener := NewEventListener(observerCh) + err = listener.Connect() + if err != nil { + return nil, err + } + for name := range services { + listener.Add(name) + } + listener.Listen() + return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}, eventListener: listener}, nil } func (mgr *systemdManager) RemoveServicesFile() error { @@ -97,8 +109,12 @@ func (mgr *systemdManager) Add(svc Service) error { defer mgr.lock.Unlock() mgr.services[svc.GetName()] = svc - - return mgr.write() + err := mgr.write() + if err != nil { + return err + } + mgr.eventListener.Add(svc.GetName()) + return nil } func (mgr *systemdManager) Get(name string) Service { @@ -111,10 +127,13 @@ func (mgr *systemdManager) Get(name string) Service { func (mgr *systemdManager) Remove(svc Service) error { mgr.lock.Lock() defer mgr.lock.Unlock() - + err := mgr.write() + if err != nil { + return err + } delete(mgr.services, svc.GetName()) - - return mgr.write() + mgr.eventListener.Remove(svc.GetName()) + return nil } func (mgr *systemdManager) write() error { @@ -122,15 +141,11 @@ func (mgr *systemdManager) write() error { if err != nil { return err } - err = os.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec - if err != nil { - return err - } - return nil + return ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } -func NewSystemd(name string, units map[string]string) (Service, error) { - return NewSystemdRootless(name, units, true) +func NewSystemd(name string, units map[string]string, eventCh chan Event) (Service, error) { + return NewSystemdRootless(name, units, true, eventCh) } func newDbusConnection(rootless bool) (*dbus.Conn, error) { @@ -161,7 +176,7 @@ func newDbusConnection(rootless bool) (*dbus.Conn, error) { } } -func NewSystemdRootless(name string, units map[string]string, rootless bool) (Service, error) { +func NewSystemdRootless(name string, units map[string]string, rootless bool, eventCh chan Event) (Service, error) { var err error var conn *dbus.Conn @@ -182,6 +197,7 @@ func NewSystemdRootless(name string, units map[string]string, rootless bool) (Se Units: unitNames, Rootless: rootless, UnitsContent: units, + eventCh: eventCh, }, nil } @@ -208,7 +224,7 @@ func (s *systemd) Remove() error { return err } } - + s.eventCh <- Event{WorkloadName: s.Name, Type: EventStopped} return s.reload() } diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 2ccd81af..5df2bfa8 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -146,7 +146,7 @@ func (w *WorkloadManager) Update(configuration models.DeviceConfigurationMessage if PodShouldWaitForMount(pod, configuration.Configuration) { errors = multierror.Append(errors, fmt.Errorf( - "Pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) + "pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) continue } diff --git a/internal/workload/podman/mock_podman.go b/internal/workload/podman/mock_podman.go index 692072ce..70621aa9 100644 --- a/internal/workload/podman/mock_podman.go +++ b/internal/workload/podman/mock_podman.go @@ -82,19 +82,19 @@ func (mr *MockPodmanMockRecorder) GenerateSystemdService(arg0, arg1, arg2 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateSystemdService", reflect.TypeOf((*MockPodman)(nil).GenerateSystemdService), arg0, arg1, arg2) } -// GetPodReportForId mocks base method. -func (m *MockPodman) GetPodReportForId(arg0 string) (*PodReport, error) { +// GetPodReportForPodName mocks base method. +func (m *MockPodman) GetPodReportForPodName(arg0 string) (*PodReport, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPodReportForId", arg0) + ret := m.ctrl.Call(m, "GetPodReportForPodName", arg0) ret0, _ := ret[0].(*PodReport) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetPodReportForId indicates an expected call of GetPodReportForId. -func (mr *MockPodmanMockRecorder) GetPodReportForId(arg0 interface{}) *gomock.Call { +// GetPodReportForPodName indicates an expected call of GetPodReportForPodName. +func (mr *MockPodmanMockRecorder) GetPodReportForPodName(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForId", reflect.TypeOf((*MockPodman)(nil).GetPodReportForId), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForPodName", reflect.TypeOf((*MockPodman)(nil).GetPodReportForPodName), arg0) } // List mocks base method. diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index 0639fe93..0e0a9170 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -15,9 +15,8 @@ import ( "github.com/blang/semver" "github.com/go-openapi/swag" "github.com/project-flotta/flotta-device-worker/internal/service" + api "github.com/project-flotta/flotta-device-worker/internal/workload/api" - log "github.com/sirupsen/logrus" - podmanEvents "github.com/containers/podman/v4/libpod/events" "github.com/containers/podman/v4/pkg/bindings" "github.com/containers/podman/v4/pkg/bindings/containers" "github.com/containers/podman/v4/pkg/bindings/generate" @@ -26,7 +25,7 @@ import ( "github.com/containers/podman/v4/pkg/bindings/secrets" "github.com/containers/podman/v4/pkg/bindings/system" "github.com/containers/podman/v4/pkg/domain/entities" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" ) @@ -36,10 +35,6 @@ const ( DefaultNetworkName = "podman" - podmanStart = string(podmanEvents.Start) - podmanRemove = string(podmanEvents.Remove) - podmanStop = string(podmanEvents.Stop) - podmanBinary = "/usr/bin/podman" autoUpdateServiceUnitTemplate = `[Unit] Description=Podman {{ .PodName }}.service @@ -72,7 +67,7 @@ type AutoUpdateUnit struct { //go:generate mockgen -package=podman -destination=mock_podman.go . Podman type Podman interface { - List() ([]api2.WorkloadInfo, error) + List() ([]api.WorkloadInfo, error) Remove(workloadId string) error Run(manifestPath, authFilePath string, annotations map[string]string) ([]*PodReport, error) Start(workloadId string) error @@ -84,7 +79,7 @@ type Podman interface { Exists(workloadId string) (bool, error) GenerateSystemdService(workload *v1.Pod, manifestPath string, monitoringInterval uint) (service.Service, error) Logs(podID string, res io.Writer) (context.CancelFunc, error) - GetPodReportForId(podID string) (*PodReport, error) + GetPodReportForPodName(podName string) (*PodReport, error) } type PodmanEvent struct { @@ -114,6 +109,8 @@ const DefaultTimeoutForStoppingInSeconds int = 5 type podman struct { podmanConnection context.Context timeoutForStopping int + eventCh chan service.Event + cancel chan bool } func NewPodman() (*podman, error) { @@ -124,6 +121,8 @@ func NewPodman() (*podman, error) { p := &podman{ podmanConnection: podmanConnection, timeoutForStopping: DefaultTimeoutForStoppingInSeconds, + eventCh: make(chan service.Event, 1000), + cancel: make(chan bool), } err = p.MinVersion() if err != nil { @@ -157,14 +156,14 @@ func (p *podman) MinVersion() error { return fmt.Errorf("podman version '%s' is not supported, needs >= v4.2", version.Version.Version) } -func (p *podman) List() ([]api2.WorkloadInfo, error) { +func (p *podman) List() ([]api.WorkloadInfo, error) { podList, err := pods.List(p.podmanConnection, nil) if err != nil { return nil, err } - var workloads []api2.WorkloadInfo + var workloads []api.WorkloadInfo for _, pod := range podList { - wi := api2.WorkloadInfo{ + wi := api.WorkloadInfo{ Id: pod.Id, Name: pod.Name, Status: pod.Status, @@ -216,73 +215,7 @@ func (p *podman) getContainerDetails(containerId string) (*ContainerReport, erro }, nil } -func (p *podman) Events(events chan *PodmanEvent) { - evchan := make(chan entities.Event, 1000) - cancel := make(chan bool) - booltrue := true - - workloads, err := p.List() - if err != nil { - log.Errorf("Cannot get the list of running pods: %v", err) - } - - for _, wrk := range workloads { - report, err := p.GetPodReportForId(wrk.Id) - if err != nil { - log.Errorf("Cannot get pod report for pod '%v', err: %v ", wrk.Name, err) - continue - } - event := &PodmanEvent{ - WorkloadName: wrk.Name, - Event: StartedContainer, - Report: report, - } - go func() { events <- event }() - } - - // subroutine that reads the event chan and sending proper messages to the - // wrapper - go func() { - for { - msg := <-evchan - event := &PodmanEvent{ - WorkloadName: msg.Actor.Attributes["name"], - } - switch msg.Action { - // create event is avoided because containers are not yet created and - // the flow is created->started - case podmanStart: - event.Event = StartedContainer - report, err := p.GetPodReportForId(msg.ID) - if err != nil { - log.Error("cannot get current pod information on event: ", err) - continue - } - event.Report = report - case podmanRemove, podmanStop: - event.Event = StoppedContainer - default: - continue - } - events <- event - } - }() - - // subroutine to track events - go func() { - err := system.Events(p.podmanConnection, evchan, cancel, &system.EventsOptions{ - Filters: map[string][]string{ - "type": {"pod"}, - }, - Stream: &booltrue, - }) - if err != nil { - log.Error("Cannot get podman events, err: ", err) - } - }() -} - -func (p *podman) GetPodReportForId(podID string) (*PodReport, error) { +func (p *podman) GetPodReportForPodName(podID string) (*PodReport, error) { podInfo, err := pods.Inspect(p.podmanConnection, podID, nil) if err != nil { return nil, err @@ -408,7 +341,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units) + svc, err = service.NewSystemd(podName, report.Units, p.eventCh) if err != nil { return nil, err } @@ -425,7 +358,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units) + svc, err = service.NewSystemd(podName, units, p.eventCh) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index b194098f..830f08de 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -9,12 +9,11 @@ import ( "github.com/project-flotta/flotta-device-worker/internal/service" - log "github.com/sirupsen/logrus" "github.com/project-flotta/flotta-device-worker/internal/workload/api" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" _ "github.com/golang/mock/mockgen/model" @@ -83,7 +82,8 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) } - serviceManager, err := service.NewSystemdManager(configDir) + eventCh := make(chan *service.Event) + serviceManager, err := service.NewSystemdManager(configDir, eventCh) if err != nil { return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) } @@ -97,26 +97,30 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, events: make(chan *podman.PodmanEvent), } - newPodman.Events(ww.events) - go func() { for { - msg := <-ww.events - switch msg.Event { - case podman.StartedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() + event := <-eventCh + log.Debugf("Event received %s", string(event.Type)) + ww.lock.Lock() + observers := ww.observers + ww.lock.Unlock() + switch event.Type { + case service.EventStarted: + log.Infof("Service for workload %s started", event.WorkloadName) + report, err := newPodman.GetPodReportForPodName(event.WorkloadName) + if err != nil { + log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) + } for _, observer := range observers { - observer.WorkloadStarted(msg.WorkloadName, []*podman.PodReport{msg.Report}) + observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) } - case podman.StoppedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() + case service.EventStopped: + log.Infof("Service for workload %s stopped", event.WorkloadName) for _, observer := range observers { - observer.WorkloadRemoved(msg.WorkloadName) + observer.WorkloadRemoved(event.WorkloadName) } + default: + log.Errorf("unknown event %s", event.Type) } } }() @@ -131,7 +135,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemdRootless("podman-auto-update", nil, false) + svc, err := service.NewSystemdRootless("podman-auto-update", nil, false, nil) if err != nil { return err } @@ -143,7 +147,7 @@ func (ww *Workload) Init() error { return ww.netfilter.AddTable(nfTableName) } -func (ww *Workload) List() ([]api2.WorkloadInfo, error) { +func (ww *Workload) List() ([]api.WorkloadInfo, error) { infos, err := ww.workloads.List() if err != nil { return nil, err @@ -240,29 +244,16 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri return fmt.Errorf("error while generating systemd service: %v", err) } + log.Infof("Creating service for %s", workload.Name) err = ww.createService(svc) if err != nil { return fmt.Errorf("error while starting service: %v", err) } + log.Infof("Adding service for listener for %s", workload.Name) err = ww.serviceManager.Add(svc) if err != nil { return fmt.Errorf("error while updating service manager: %v", err) } - - podReport, err := ww.workloads.GetPodReportForId(workload.Name) - if err != nil { - return fmt.Errorf("error while sending started events: %v", err) - } - - // When pod started by systemd the event is not sent to the channel, so we send - // it manually here to notify workloadStarted observer. - go func() { - ww.events <- &podman.PodmanEvent{ - Event: podman.StartedContainer, - WorkloadName: workload.Name, - Report: podReport, - } - }() return nil } @@ -287,9 +278,8 @@ func (ww *Workload) removeService(workloadName string) error { err := ww.serviceManager.Remove(svc) if err != nil { - return nil + log.Errorf("unable to remove service from serviceManager %s:%s", workloadName, err) } - return nil } diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index d22a896f..2db237f0 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -56,7 +56,6 @@ var _ = Describe("Workload management", func() { newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) - newPodman.EXPECT().GetPodReportForId("pod1").Return(nil, nil) svc.EXPECT().Add().Return(nil) svc.EXPECT().Enable().Return(nil) From beecc65176bf8aa25f00309ed23b530666aae749 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Thu, 28 Jul 2022 17:34:05 -0400 Subject: [PATCH 02/13] Refactor systemd functions --- internal/metrics/system.go | 2 +- internal/service/event_listener.go | 2 +- internal/service/systemd.go | 33 ++++++++++++++++-------------- internal/workload/podman/podman.go | 4 ++-- internal/workload/wrapper.go | 2 +- 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 01cabd37..45bc445e 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false, nil) + nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus, nil) if err != nil { return nil, err } diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index cfbadec6..e7997b9b 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -55,7 +55,7 @@ func NewEventListener(observerCh chan *Event) *EventListener { } func (e *EventListener) Connect() error { - conn, err := newDbusConnection(true) + conn, err := newDbusConnection(UserBus) if err != nil { return err } diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 500f93e3..4c109303 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -27,6 +27,13 @@ var ( DefaultUnitsPath = path.Join(os.Getenv("HOME"), ".config/systemd/user/") ) +type BusType string + +const ( + UserBus BusType = "user" + SystemBus BusType = "system" +) + //go:generate mockgen -package=service -destination=mock_systemd.go . Service type Service interface { GetName() string @@ -44,7 +51,7 @@ type systemd struct { Units []string `json:"units"` UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` - Rootless bool `json:"rootless"` + BusType BusType `json:"busType"` eventCh chan Event `json:"-"` } @@ -144,12 +151,8 @@ func (mgr *systemdManager) write() error { return ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } -func NewSystemd(name string, units map[string]string, eventCh chan Event) (Service, error) { - return NewSystemdRootless(name, units, true, eventCh) -} - -func newDbusConnection(rootless bool) (*dbus.Conn, error) { - if rootless { +func newDbusConnection(busType BusType) (*dbus.Conn, error) { + if busType == UserBus { return dbus.NewConnection(func() (*godbus.Conn, error) { uid := path.Base(os.Getenv("FLOTTA_XDG_RUNTIME_DIR")) path := filepath.Join(os.Getenv("FLOTTA_XDG_RUNTIME_DIR"), "systemd/private") @@ -176,11 +179,11 @@ func newDbusConnection(rootless bool) (*dbus.Conn, error) { } } -func NewSystemdRootless(name string, units map[string]string, rootless bool, eventCh chan Event) (Service, error) { +func NewSystemd(name string, units map[string]string, busType BusType, eventCh chan Event) (Service, error) { var err error var conn *dbus.Conn - conn, err = newDbusConnection(rootless) + conn, err = newDbusConnection(busType) if err != nil { return nil, err } @@ -195,7 +198,7 @@ func NewSystemdRootless(name string, units map[string]string, rootless bool, eve RestartSec: DefaultRestartTimeout, dbusConnection: conn, Units: unitNames, - Rootless: rootless, + BusType: busType, UnitsContent: units, eventCh: eventCh, }, nil @@ -233,7 +236,7 @@ func (s *systemd) GetName() string { } func (s *systemd) reload() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -242,7 +245,7 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -262,7 +265,7 @@ func (s *systemd) Start() error { } func (s *systemd) Stop() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -282,7 +285,7 @@ func (s *systemd) Stop() error { } func (s *systemd) Enable() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -293,7 +296,7 @@ func (s *systemd) Enable() error { } func (s *systemd) Disable() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index 0e0a9170..bc47440e 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -341,7 +341,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units, p.eventCh) + svc, err = service.NewSystemd(podName, report.Units, service.UserBus, p.eventCh) if err != nil { return nil, err } @@ -358,7 +358,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units, p.eventCh) + svc, err = service.NewSystemd(podName, units, service.UserBus, p.eventCh) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 830f08de..3adf1f5b 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -135,7 +135,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemdRootless("podman-auto-update", nil, false, nil) + svc, err := service.NewSystemd("podman-auto-update", nil, service.SystemBus, nil) if err != nil { return err } From eb02092e4e1260a361e9f067762eb8d2bb5dbb9f Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Thu, 28 Jul 2022 20:41:58 -0400 Subject: [PATCH 03/13] Changes due to golint run --- internal/ansible/ansible_manager_test.go | 6 ++-- internal/ansible/mapping/mapping.go | 25 ++++++++++----- internal/ansible/mapping/mapping_test.go | 20 ++++++++---- internal/heartbeat/heartbeat_test.go | 40 ------------------------ 4 files changed, 36 insertions(+), 55 deletions(-) diff --git a/internal/ansible/ansible_manager_test.go b/internal/ansible/ansible_manager_test.go index 2f6077db..9a6fd60a 100644 --- a/internal/ansible/ansible_manager_test.go +++ b/internal/ansible/ansible_manager_test.go @@ -164,8 +164,10 @@ var _ = Describe("Ansible Runner", func() { modTime1 := time.Now().Add(-3 * time.Hour) modTime2 := time.Now().Add(-2 * time.Hour) - p1Sha := ansibleManager.MappingRepository.GetSha256(p1) - p2Sha := ansibleManager.MappingRepository.GetSha256(p2) + p1Sha, err := ansibleManager.MappingRepository.GetSha256(p1) + Expect(err).ToNot(HaveOccurred()) + p2Sha, err := ansibleManager.MappingRepository.GetSha256(p2) + Expect(err).ToNot(HaveOccurred()) p1Path := path.Join(configDir, p1Sha) p2Path := path.Join(configDir, p2Sha) diff --git a/internal/ansible/mapping/mapping.go b/internal/ansible/mapping/mapping.go index d242042b..c3ebdf3f 100644 --- a/internal/ansible/mapping/mapping.go +++ b/internal/ansible/mapping/mapping.go @@ -20,7 +20,7 @@ type mapping struct { //go:generate mockgen -package=mapping -destination=mock_mapping.go . MappingRepository type MappingRepository interface { - GetSha256(fileContent []byte) string + GetSha256(fileContent []byte) (string, error) Add(fileContent []byte, modTime time.Time) error Remove(fileContent []byte) error RemoveMappingFile() error @@ -85,17 +85,24 @@ func (m *mappingRepository) GetAll() map[int]string { return all } -func (m *mappingRepository) GetSha256(fileContent []byte) string { +func (m *mappingRepository) GetSha256(fileContent []byte) (string, error) { h := sha256.New() - h.Write(fileContent) - return fmt.Sprintf("%x", h.Sum(nil)) + _, err := h.Write(fileContent) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", h.Sum(nil)), nil } func (m *mappingRepository) Add(fileContent []byte, modTime time.Time) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) - err := os.WriteFile(filePath, []byte(fileContent), 0600) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) + err = os.WriteFile(filePath, []byte(fileContent), 0600) if err != nil { return err @@ -111,7 +118,11 @@ func (m *mappingRepository) Remove(fileContent []byte) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) modTime := m.pathToModTime[filePath] delete(m.modTimeToPath, modTime) delete(m.pathToModTime, filePath) diff --git a/internal/ansible/mapping/mapping_test.go b/internal/ansible/mapping/mapping_test.go index 37b77207..ea6e2526 100644 --- a/internal/ansible/mapping/mapping_test.go +++ b/internal/ansible/mapping/mapping_test.go @@ -23,7 +23,8 @@ var _ = Describe("Mapping", func() { repo, err = mapping.NewMappingRepository(dir) Expect(err).ToNot(HaveOccurred()) - sha256Test = repo.GetSha256([]byte("test")) + sha256Test, err = repo.GetSha256([]byte("test")) + Expect(err).ToNot(HaveOccurred()) filePathTest = path.Join(configDir, sha256Test) }) AfterEach(func() { @@ -31,8 +32,10 @@ var _ = Describe("Mapping", func() { Expect(err).ToNot(HaveOccurred()) }) It("sha256 Generation", func() { - s1 := repo.GetSha256([]byte("AAA")) - s2 := repo.GetSha256([]byte("AAA")) + s1, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) + s2, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) Expect(s1).To(Equal(s2)) }) It("Should be created empty", func() { @@ -86,12 +89,17 @@ var _ = Describe("Mapping", func() { It("Should persist mappings", func() { // given - filePath1 := path.Join(configDir, repo.GetSha256([]byte("test-one"))) - filePath2 := path.Join(configDir, repo.GetSha256([]byte("test-two"))) + sha, err := repo.GetSha256([]byte("test-one")) + Expect(err).ToNot(HaveOccurred()) + filePath1 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) + sha, err = repo.GetSha256([]byte("test-two")) + filePath2 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) modTime1 := time.Now() modTime2 := modTime1.Add(1 * time.Minute) - err := repo.Add([]byte("test-one"), modTime1) + err = repo.Add([]byte("test-one"), modTime1) Expect(err).ToNot(HaveOccurred()) Expect(repo.GetModTime(filePath1)).To(Equal(modTime1.UnixNano())) Expect(repo.GetFilePath(modTime1)).To(Equal(filePath1)) diff --git a/internal/heartbeat/heartbeat_test.go b/internal/heartbeat/heartbeat_test.go index 0464c730..1b3163cf 100644 --- a/internal/heartbeat/heartbeat_test.go +++ b/internal/heartbeat/heartbeat_test.go @@ -6,13 +6,11 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "reflect" "sync" "time" - "github.com/openshift/assisted-installer-agent/src/util" "github.com/project-flotta/flotta-device-worker/internal/ansible" os2 "github.com/project-flotta/flotta-device-worker/internal/os" "github.com/project-flotta/flotta-device-worker/internal/registration" @@ -759,44 +757,6 @@ func (d *DispatcherFailing) GetConfig(ctx context.Context, in *pb.Empty, opts .. return nil, nil } -func NewFilledInterfaceMock(mtu int, name string, macAddr string, flags net.Flags, addrs []string, isPhysical bool, isBonding bool, isVlan bool, speedMbps int64) *util.MockInterface { - hwAddr, _ := net.ParseMAC(macAddr) - ret := util.MockInterface{} - ret.On("IsPhysical").Return(isPhysical) - if isPhysical || isBonding || isVlan { - ret.On("Name").Return(name) - ret.On("MTU").Return(mtu) - ret.On("HardwareAddr").Return(hwAddr) - ret.On("Flags").Return(flags) - ret.On("Addrs").Return(toAddresses(addrs), nil) - ret.On("SpeedMbps").Return(speedMbps) - } - if !isPhysical { - ret.On("IsBonding").Return(isBonding) - } - if !(isPhysical || isBonding) { - ret.On("IsVlan").Return(isVlan) - } - - return &ret -} - -func toAddresses(addrs []string) []net.Addr { - ret := make([]net.Addr, 0) - for _, a := range addrs { - ret = append(ret, str2Addr(a)) - } - return ret -} - -func str2Addr(addrStr string) net.Addr { - ip, ipnet, err := net.ParseCIDR(addrStr) - if err != nil { - return &net.IPNet{} - } - return &net.IPNet{IP: ip, Mask: ipnet.Mask} -} - func initHwMock(hwMock *hardware.MockHardware, configManager *configuration.Manager, hostname string, ipv4 []string) (*gomock.Call, *gomock.Call, *gomock.Call) { var m models.HardwareInfo configManager.GetDeviceConfiguration().Heartbeat.HardwareProfile.Scope = heartbeat.ScopeDelta From 9576b122ae01c2383023f80e2d1befad76f5d69f Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Mon, 1 Aug 2022 10:19:32 -0400 Subject: [PATCH 04/13] Refactored event listener based on feedback --- internal/service/event_listener.go | 52 +++++++++++++++--------------- internal/service/systemd.go | 8 +---- internal/workload/wrapper.go | 10 ++++-- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index e7997b9b..329197e9 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -50,44 +50,44 @@ type EventListener struct { dbusErrCh <-chan error } -func NewEventListener(observerCh chan *Event) *EventListener { - return &EventListener{observerCh: observerCh} +func NewEventListener() *EventListener { + return &EventListener{} } -func (e *EventListener) Connect() error { +func (e *EventListener) Connect() (chan *Event, error) { conn, err := newDbusConnection(UserBus) if err != nil { - return err + return nil, err } e.set = conn.NewSubscriptionSet() - return nil + e.observerCh = make(chan *Event) + return e.observerCh, nil } -func (e *EventListener) Listen() { +func (e *EventListener) Listen() chan *Event { e.dbusCh, e.dbusErrCh = e.set.Subscribe() - go func() { - for { - select { - case msg := <-e.dbusCh: - for name, unit := range msg { - log.Debugf("Captured event for %s: %v+", name, unit) - n := extractWorkloadName(name) - state := translateUnitSubStatus(unit) - log.Infof("Service %s for workload transitioned to sub state %s\n", n, state) - switch state { - case running: - e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} - case removed, stop, dead, failed, start: - e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} - default: - log.Infof("Ignoring unit sub state for service %s: %s", name, unit.SubState) - } + for { + select { + case msg := <-e.dbusCh: + for name, unit := range msg { + log.Debugf("Captured event for %s: %v+", name, unit) + n := extractWorkloadName(name) + state := translateUnitSubStatus(unit) + log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) + switch state { + case running: + e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} + case removed, stop, dead, failed, start: + e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} + default: + log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) } - case msg := <-e.dbusErrCh: - log.Errorf("Error while parsing dbus event: %v", msg) } + case msg := <-e.dbusErrCh: + log.Errorf("Error while parsing dbus event: %v", msg) } - }() + } + } func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 4c109303..8b37e27e 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -70,7 +70,7 @@ type systemdManager struct { eventListener *EventListener } -func NewSystemdManager(configDir string, observerCh chan *Event) (SystemdManager, error) { +func NewSystemdManager(configDir string, listener *EventListener) (SystemdManager, error) { services := make(map[string]*systemd) servicePath := path.Join(configDir, "services.json") servicesJson, err := os.ReadFile(servicePath) //#nosec @@ -85,15 +85,9 @@ func NewSystemdManager(configDir string, observerCh chan *Event) (SystemdManager for k, v := range services { systemdSVC[k] = v } - listener := NewEventListener(observerCh) - err = listener.Connect() - if err != nil { - return nil, err - } for name := range services { listener.Add(name) } - listener.Listen() return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}, eventListener: listener}, nil } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 3adf1f5b..94e94025 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -82,8 +82,14 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) } - eventCh := make(chan *service.Event) - serviceManager, err := service.NewSystemdManager(configDir, eventCh) + listener := service.NewEventListener() + eventCh, err := listener.Connect() + if err != nil { + return nil, err + } + go listener.Listen() + + serviceManager, err := service.NewSystemdManager(configDir, listener) if err != nil { return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) } From cced29fe620461acca352de4339c01ca3e10ccfe Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 2 Aug 2022 10:52:19 -0400 Subject: [PATCH 05/13] Minimal refactor and cosmetic changes based on feedback --- internal/service/event_listener.go | 8 ++++++-- internal/service/systemd.go | 1 - internal/workload/wrapper.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 329197e9..eabc0ad2 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -98,7 +98,7 @@ func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { } func (e *EventListener) Add(workloadName string) { - name := fmt.Sprintf("%s.service", workloadName) + name := formatServiceName(workloadName) log.Debugf("Adding service for events %s", name) if !e.set.Contains(name) { e.set.Add(name) @@ -106,7 +106,7 @@ func (e *EventListener) Add(workloadName string) { } func (e *EventListener) Remove(workloadName string) { - name := fmt.Sprintf("%s.service", workloadName) + name := formatServiceName(workloadName) log.Debugf("Removing service for events %s", name) e.set.Remove(name) } @@ -114,3 +114,7 @@ func (e *EventListener) Remove(workloadName string) { func extractWorkloadName(serviceName string) string { return serviceName[:len(serviceName)-servicePostfixLength] } + +func formatServiceName(workloadName string) string { + return fmt.Sprintf("%s.service", workloadName) +} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 8b37e27e..1b1f89a5 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -20,7 +20,6 @@ const ( DefaultRestartTimeout = 15 TimerSuffix = ".timer" ServiceSuffix = ".service" - DefaultNameSeparator = "-" ) var ( diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 94e94025..321dfddc 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -284,7 +284,7 @@ func (ww *Workload) removeService(workloadName string) error { err := ww.serviceManager.Remove(svc) if err != nil { - log.Errorf("unable to remove service from serviceManager %s:%s", workloadName, err) + log.Errorf("Unable to remove service from serviceManager %s:%s", workloadName, err) } return nil } From 3ee2afe955fa676a4995bc53959620f9e90aa05f Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 2 Aug 2022 11:43:58 -0400 Subject: [PATCH 06/13] Changed podman-auto-update to use userBus --- internal/workload/wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 321dfddc..c09bd6ea 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -141,7 +141,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemd("podman-auto-update", nil, service.SystemBus, nil) + svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus, nil) if err != nil { return err } From 1dc0c915f40c887aa023748b84c4c408e9659c59 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 2 Aug 2022 11:46:28 -0400 Subject: [PATCH 07/13] Refactor systemd listener: * Moved event listener instance and initialization to cmd.go main * Added a file system watcher to the systemd enabled services directory to be notified of new services from being enabled/disabled so that the event listener can track their status * Encapsulated the code to listen to service events generated by the systemd event listener so that it is called at the last line of the cmd.go#main function to make sure all observers are ready before start processing the service events --- cmd/device-worker/main.go | 48 +++++--- go.mod | 1 + internal/ansible/mapping/mock_mapping.go | 5 +- internal/configuration/configuration.go | 1 + internal/metrics/system.go | 2 +- internal/metrics/workload.go | 1 + internal/service/event_listener.go | 150 +++++++++++++++++++---- internal/service/systemd.go | 27 ++-- internal/workload/manager.go | 8 +- internal/workload/mock_wrapper.go | 12 ++ internal/workload/podman/podman.go | 8 +- internal/workload/wrapper.go | 121 +++++++++--------- internal/workload/wrapper_test.go | 2 +- vendor/modules.txt | 1 + 14 files changed, 254 insertions(+), 133 deletions(-) diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index 1bce48e8..1a73394c 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -20,7 +20,8 @@ import ( os2 "github.com/project-flotta/flotta-device-worker/internal/os" registration2 "github.com/project-flotta/flotta-device-worker/internal/registration" "github.com/project-flotta/flotta-device-worker/internal/server" - workload2 "github.com/project-flotta/flotta-device-worker/internal/workload" + "github.com/project-flotta/flotta-device-worker/internal/service" + workload "github.com/project-flotta/flotta-device-worker/internal/workload" "net" "os" @@ -41,9 +42,9 @@ const ( ) func initSystemdDirectory() error { - systemddir := filepath.Join(os.Getenv("HOME"), ".config/systemd/user/") + // init the flotta user systemd units directory - err := os.MkdirAll(systemddir, 0750) + err := os.MkdirAll(service.SystemdUserServicesFullPath, 0750) if err != nil { return err } @@ -149,6 +150,16 @@ func main() { } configManager := configuration2.NewConfigurationManager(dataDir) + // Systemd event listener + listener := service.NewDBusEventListener() + eventCh, err := listener.Connect() + if err != nil { + log.Fatal(err) + } + // Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise + // we risk in missing observers whilst processing the events. + go listener.Listen() + // --- Client metrics configuration --- metricsStore, err := metrics.NewTSDB(dataDir) if err != nil { @@ -170,16 +181,16 @@ func main() { dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon) configManager.RegisterObserver(dataTransferWatcher) - wl, err := workload2.NewWorkloadManager(dataDir, deviceId) + workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, eventCh) if err != nil { log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err) } - logsWrapper := logs.NewWorkloadsLogsTarget(wl) + logsWrapper := logs.NewWorkloadsLogsTarget(workloadManager) configManager.RegisterObserver(logsWrapper) - wl.RegisterObserver(logsWrapper) - configManager.RegisterObserver(wl) - wl.RegisterObserver(workloadMetricWatcher) + workloadManager.RegisterObserver(logsWrapper) + configManager.RegisterObserver(workloadManager) + workloadManager.RegisterObserver(workloadMetricWatcher) remoteWrite := metrics.NewRemoteWrite(dataDir, deviceId, metricsStore) configManager.RegisterObserver(remoteWrite) @@ -197,25 +208,21 @@ func main() { } configManager.RegisterObserver(mountManager) - dataMonitor := datatransfer.NewMonitor(wl, configManager) - wl.RegisterObserver(dataMonitor) + dataMonitor := datatransfer.NewMonitor(workloadManager, configManager) + workloadManager.RegisterObserver(dataMonitor) configManager.RegisterObserver(dataMonitor) dataMonitor.Start() - if err != nil { - log.Fatalf("cannot start metrics store. DeviceID: %s; err: %v", deviceId, err) - } - - reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, wl) + reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, workloadManager) if err != nil { log.Fatalf("cannot start registration process: DeviceID: %s; err: %v", deviceId, err) } - hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, wl, &hw, dataMonitor, deviceOs, reg) + hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, workloadManager, &hw, dataMonitor, deviceOs, reg) configManager.RegisterObserver(hbs) reg.DeregisterLater( - wl, + workloadManager, configManager, hbs, dataMonitor, @@ -247,16 +254,19 @@ func main() { setupSignalHandler(metricsStore, ansibleManager) - go listenStartGracefulRebootChannel(wl, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, + go listenStartGracefulRebootChannel(workloadManager, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, gracefulRebootChannel, deviceOs) + // All observers have been registered at this point. Ready to start listening to workload service events generated by changes to the systemd dbus + go workloadManager.ListenServiceEvents() + if err := s.Serve(l); err != nil { log.Fatalf("cannot start worker server, err: %v", err) } } -func listenStartGracefulRebootChannel(wl *workload2.WorkloadManager, dataMonitor *datatransfer.Monitor, +func listenStartGracefulRebootChannel(wl *workload.WorkloadManager, dataMonitor *datatransfer.Monitor, systemMetricsWatcher *metrics.SystemMetrics, metricsStore *metrics.TSDB, hbs *heartbeat2.Heartbeat, ansibleManager *ansible.Manager, gracefulRebootChannel chan struct{}, deviceOs *os2.OS) { // listen to the channel for getting StartGracefulReboot signal diff --git a/go.mod b/go.mod index 7090ca11..1e091642 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/digitalocean/godo v1.80.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect + github.com/fsnotify/fsnotify v1.5.4 github.com/go-kit/log v0.2.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/runtime v0.23.1 // indirect diff --git a/internal/ansible/mapping/mock_mapping.go b/internal/ansible/mapping/mock_mapping.go index fd3ead81..6a05f38b 100644 --- a/internal/ansible/mapping/mock_mapping.go +++ b/internal/ansible/mapping/mock_mapping.go @@ -105,11 +105,12 @@ func (mr *MockMappingRepositoryMockRecorder) GetModTime(arg0 interface{}) *gomoc } // GetSha256 mocks base method. -func (m *MockMappingRepository) GetSha256(arg0 []byte) string { +func (m *MockMappingRepository) GetSha256(arg0 []byte) (string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSha256", arg0) ret0, _ := ret[0].(string) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // GetSha256 indicates an expected call of GetSha256. diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index ca35792a..a0ed48ce 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -146,6 +146,7 @@ func (m *Manager) Update(message models.DeviceConfigurationMessage) error { } log.Infof("updating configuration. New config: %s\nOld config: %s", newJson, oldJson) + log.Debugf("[ConfigManager] observers :%v+", m.observers) for _, observer := range m.observers { err := observer.Update(message) if err != nil { diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 45bc445e..e3d2e46d 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus, nil) + nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus) if err != nil { return nil, err } diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index af109e8f..b6be7a79 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -53,6 +53,7 @@ func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) { } func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podman.PodReport) { + log.Infof("Starting target metrics for workload '%s'", workloadName) for _, workload := range report { cfg := wrkM.getWorkload(workloadName) if cfg == nil { diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index eabc0ad2..3b90cf27 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -1,20 +1,24 @@ package service import ( + "errors" "fmt" + "os" + "path/filepath" + "strings" "github.com/coreos/go-systemd/v22/dbus" + "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" ) const ( - // servicePostfixLength is the length of the string ".service". This value is used to extract the workload name from the service - // coming from systemd - servicePostfixLength = 8 - EventStarted EventType = "started" EventStopped EventType = "stopped" + systemdUserServicesDir = ".config/systemd/user" + defaultTargetWantsRelativePath = "default.target.wants/" + // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent // The only addition to the list is the "removed" case, which we use when a service is removed // from the system and we receive an empty unitStatus object. @@ -34,6 +38,11 @@ const ( removed unitSubState = "removed" ) +var ( + SystemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) + enabledServicesFullPath = filepath.Join(SystemdUserServicesFullPath, defaultTargetWantsRelativePath) +) + type unitSubState string type EventType string @@ -43,41 +52,92 @@ type Event struct { Type EventType } -type EventListener struct { +type DBusEventListener struct { observerCh chan *Event set *dbus.SubscriptionSet dbusCh <-chan map[string]*dbus.UnitStatus dbusErrCh <-chan error + fsWatcher *fsnotify.Watcher } -func NewEventListener() *EventListener { - return &EventListener{} +func NewDBusEventListener() *DBusEventListener { + return &DBusEventListener{} } -func (e *EventListener) Connect() (chan *Event, error) { +func (e *DBusEventListener) Connect() (<-chan *Event, error) { conn, err := newDbusConnection(UserBus) if err != nil { return nil, err } e.set = conn.NewSubscriptionSet() - e.observerCh = make(chan *Event) + if err := e.initializeSubscriptionSet(); err != nil { + return nil, err + } + e.observerCh = make(chan *Event, 1000) return e.observerCh, nil } -func (e *EventListener) Listen() chan *Event { +func (e *DBusEventListener) initializeSubscriptionSet() error { + _, err := os.Stat(enabledServicesFullPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return e.watchServiceDirectory() + } + return err + } + files, err := os.ReadDir(enabledServicesFullPath) + if err != nil { + return err + } + log.Infof("List of services to be monitored in %s:%s", enabledServicesFullPath, files) + for _, fd := range files { + f := filepath.Join(enabledServicesFullPath, fd.Name()) + log.Debugf("Detected service file %s", f) + if !fileExist(f) { + log.Errorf("Hard link %s does not exist or broken link", fd.Name()) + continue + } + e.Add(getServiceFileName(fd.Name())) + } + return e.watchServiceDirectory() +} + +func (e *DBusEventListener) watchServiceDirectory() error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + e.fsWatcher = watcher + + _, err = os.Stat(SystemdUserServicesFullPath) + if err != nil { + return fmt.Errorf("systemd user directory not found: %s", err) + } + + go e.watchFSEvents() + return e.fsWatcher.Add(SystemdUserServicesFullPath) +} + +func (e *DBusEventListener) Listen() { e.dbusCh, e.dbusErrCh = e.set.Subscribe() for { select { case msg := <-e.dbusCh: for name, unit := range msg { - log.Debugf("Captured event for %s: %v+", name, unit) - n := extractWorkloadName(name) + log.Debugf("Captured DBus event for %s: %v+", name, unit) + n, err := extractWorkloadName(name) + if err != nil { + log.Error(err) + continue + } state := translateUnitSubStatus(unit) log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) switch state { case running: + log.Debugf("Sending start event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} case removed, stop, dead, failed, start: + log.Debugf("Sending stop event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} default: log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) @@ -90,6 +150,36 @@ func (e *EventListener) Listen() chan *Event { } +func (e *DBusEventListener) watchFSEvents() { + for { + select { + case event, ok := <-e.fsWatcher.Events: + if !ok { + log.Errorf("Error while watching for file system events at %s:%s", SystemdUserServicesFullPath, event) + continue + } + log.Debugf("Captured file system event:%s", event) + if !isAnEnabledService(event.Name) { + continue + } + svcName := getServiceFileName(event.Name) + switch { + case event.Op&fsnotify.Create == fsnotify.Create: + log.Infof("New service configuration detected at %s", event.Name) + e.Add(svcName) + case event.Op&fsnotify.Remove == fsnotify.Remove: + log.Infof("Service configuration removed at %s", event.Name) + e.Remove(svcName) + } + case err, ok := <-e.fsWatcher.Errors: + if !ok { + log.Errorf("Error detected on file system watcher: %s", err) + return + } + } + } +} + func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { if unit == nil { return removed @@ -97,24 +187,32 @@ func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { return unitSubState(unit.SubState) } -func (e *EventListener) Add(workloadName string) { - name := formatServiceName(workloadName) - log.Debugf("Adding service for events %s", name) - if !e.set.Contains(name) { - e.set.Add(name) - } +func (e *DBusEventListener) Add(serviceName string) { + log.Debugf("Adding service for event listener %s", serviceName) + e.set.Add(serviceName) } -func (e *EventListener) Remove(workloadName string) { - name := formatServiceName(workloadName) - log.Debugf("Removing service for events %s", name) - e.set.Remove(name) +func (e *DBusEventListener) Remove(serviceName string) { + log.Debugf("Removing service from event listener %s", serviceName) + e.set.Remove(serviceName) } -func extractWorkloadName(serviceName string) string { - return serviceName[:len(serviceName)-servicePostfixLength] +func extractWorkloadName(serviceName string) (string, error) { + if filepath.Ext(serviceName) != ServiceSuffix { + return "", fmt.Errorf("invalid file name or not a service %s", serviceName) + } + return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil +} + +func fileExist(path string) bool { + _, err := os.Stat(path) + return err == nil } -func formatServiceName(workloadName string) string { - return fmt.Sprintf("%s.service", workloadName) +func isAnEnabledService(fullPath string) bool { + return filepath.Ext(fullPath) == ServiceSuffix && filepath.Dir(fullPath) == enabledServicesFullPath +} +func getServiceFileName(fullPath string) string { + _, filename := filepath.Split(fullPath) + return filename } diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 1b1f89a5..bc65cfcc 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -46,12 +46,10 @@ type Service interface { type systemd struct { Name string `json:"name"` - RestartSec int `json:"restartSec"` Units []string `json:"units"` UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` BusType BusType `json:"busType"` - eventCh chan Event `json:"-"` } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -63,13 +61,12 @@ type SystemdManager interface { } type systemdManager struct { - svcFilePath string - lock sync.RWMutex - services map[string]Service - eventListener *EventListener + svcFilePath string + lock sync.RWMutex + services map[string]Service } -func NewSystemdManager(configDir string, listener *EventListener) (SystemdManager, error) { +func NewSystemdManager(configDir string) (SystemdManager, error) { services := make(map[string]*systemd) servicePath := path.Join(configDir, "services.json") servicesJson, err := os.ReadFile(servicePath) //#nosec @@ -84,10 +81,8 @@ func NewSystemdManager(configDir string, listener *EventListener) (SystemdManage for k, v := range services { systemdSVC[k] = v } - for name := range services { - listener.Add(name) - } - return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}, eventListener: listener}, nil + + return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}}, nil } func (mgr *systemdManager) RemoveServicesFile() error { @@ -113,7 +108,6 @@ func (mgr *systemdManager) Add(svc Service) error { if err != nil { return err } - mgr.eventListener.Add(svc.GetName()) return nil } @@ -132,7 +126,6 @@ func (mgr *systemdManager) Remove(svc Service) error { return err } delete(mgr.services, svc.GetName()) - mgr.eventListener.Remove(svc.GetName()) return nil } @@ -172,7 +165,7 @@ func newDbusConnection(busType BusType) (*dbus.Conn, error) { } } -func NewSystemd(name string, units map[string]string, busType BusType, eventCh chan Event) (Service, error) { +func NewSystemd(name string, units map[string]string, busType BusType) (Service, error) { var err error var conn *dbus.Conn @@ -188,12 +181,10 @@ func NewSystemd(name string, units map[string]string, busType BusType, eventCh c return &systemd{ Name: name, - RestartSec: DefaultRestartTimeout, dbusConnection: conn, Units: unitNames, BusType: busType, UnitsContent: units, - eventCh: eventCh, }, nil } @@ -220,7 +211,6 @@ func (s *systemd) Remove() error { return err } } - s.eventCh <- Event{WorkloadName: s.Name, Type: EventStopped} return s.reload() } @@ -238,6 +228,7 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { + log.Debugf("Starting service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err @@ -278,6 +269,7 @@ func (s *systemd) Stop() error { } func (s *systemd) Enable() error { + log.Debugf("Enabling service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err @@ -289,6 +281,7 @@ func (s *systemd) Enable() error { } func (s *systemd) Disable() error { + log.Debugf("Disabling service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 5df2bfa8..796f4305 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -39,8 +39,8 @@ type WorkloadManager struct { deviceId string } -func NewWorkloadManager(dataDir string, deviceId string) (*WorkloadManager, error) { - wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval) +func NewWorkloadManager(dataDir string, deviceId string, systemdEventCh <-chan *service.Event) (*WorkloadManager, error) { + wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval, systemdEventCh) if err != nil { return nil, err } @@ -414,6 +414,10 @@ func (w *WorkloadManager) deleteVolumeDir() error { return deleteDir(w.volumesDir) } +func (w *WorkloadManager) ListenServiceEvents() { + w.workloads.ListenServiceEvents() +} + func deleteDir(path string) error { err := os.RemoveAll(path) if err != nil { diff --git a/internal/workload/mock_wrapper.go b/internal/workload/mock_wrapper.go index 49ed1f81..b3a163ed 100644 --- a/internal/workload/mock_wrapper.go +++ b/internal/workload/mock_wrapper.go @@ -95,6 +95,18 @@ func (mr *MockWorkloadWrapperMockRecorder) ListSecrets() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSecrets", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListSecrets)) } +// ListenServiceEvents mocks base method. +func (m *MockWorkloadWrapper) ListenServiceEvents() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ListenServiceEvents") +} + +// ListenServiceEvents indicates an expected call of ListenServiceEvents. +func (mr *MockWorkloadWrapperMockRecorder) ListenServiceEvents() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListenServiceEvents", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListenServiceEvents)) +} + // Logs mocks base method. func (m *MockWorkloadWrapper) Logs(arg0 string, arg1 io.Writer) (context.CancelFunc, error) { m.ctrl.T.Helper() diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index bc47440e..1f6b3867 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -109,8 +109,6 @@ const DefaultTimeoutForStoppingInSeconds int = 5 type podman struct { podmanConnection context.Context timeoutForStopping int - eventCh chan service.Event - cancel chan bool } func NewPodman() (*podman, error) { @@ -121,8 +119,6 @@ func NewPodman() (*podman, error) { p := &podman{ podmanConnection: podmanConnection, timeoutForStopping: DefaultTimeoutForStoppingInSeconds, - eventCh: make(chan service.Event, 1000), - cancel: make(chan bool), } err = p.MinVersion() if err != nil { @@ -341,7 +337,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units, service.UserBus, p.eventCh) + svc, err = service.NewSystemd(podName, report.Units, service.UserBus) if err != nil { return nil, err } @@ -358,7 +354,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units, service.UserBus, p.eventCh) + svc, err = service.NewSystemd(podName, units, service.UserBus) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index c09bd6ea..2d7510d4 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -44,31 +44,33 @@ type WorkloadWrapper interface { RemoveSecret(string) error CreateSecret(string, string) error UpdateSecret(string, string) error + ListenServiceEvents() } // Workload manages the workload and its configuration on the device type Workload struct { - workloads podman.Podman + podManager podman.Podman netfilter network.Netfilter mappingRepository mapping.MappingRepository observers []Observer serviceManager service.SystemdManager monitoringInterval uint - events chan *podman.PodmanEvent lock sync.RWMutex + systemdEventCh <-chan *service.Event } -func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint) *Workload { +func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint, systemdEventCh <-chan *service.Event) *Workload { return &Workload{ - workloads: p, + podManager: p, netfilter: n, mappingRepository: m, serviceManager: s, monitoringInterval: monitoringInterval, + systemdEventCh: systemdEventCh, } } -func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, error) { +func newWorkloadInstance(configDir string, monitoringInterval uint, systemdEventCh <-chan *service.Event) (*Workload, error) { newPodman, err := podman.NewPodman() if err != nil { return nil, fmt.Errorf("workload cannot initialize podman manager: %w", err) @@ -82,54 +84,20 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) } - listener := service.NewEventListener() - eventCh, err := listener.Connect() - if err != nil { - return nil, err - } - go listener.Listen() - - serviceManager, err := service.NewSystemdManager(configDir, listener) + serviceManager, err := service.NewSystemdManager(configDir) if err != nil { return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) } ww := &Workload{ - workloads: newPodman, + podManager: newPodman, netfilter: netfilter, mappingRepository: mappingRepository, serviceManager: serviceManager, monitoringInterval: monitoringInterval, - events: make(chan *podman.PodmanEvent), - } - - go func() { - for { - event := <-eventCh - log.Debugf("Event received %s", string(event.Type)) - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() - switch event.Type { - case service.EventStarted: - log.Infof("Service for workload %s started", event.WorkloadName) - report, err := newPodman.GetPodReportForPodName(event.WorkloadName) - if err != nil { - log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) - } - for _, observer := range observers { - observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) - } - case service.EventStopped: - log.Infof("Service for workload %s stopped", event.WorkloadName) - for _, observer := range observers { - observer.WorkloadRemoved(event.WorkloadName) - } - default: - log.Errorf("unknown event %s", event.Type) - } - } - }() + systemdEventCh: systemdEventCh, + } + return ww, nil } @@ -141,7 +109,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus, nil) + svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus) if err != nil { return err } @@ -154,7 +122,7 @@ func (ww *Workload) Init() error { } func (ww *Workload) List() ([]api.WorkloadInfo, error) { - infos, err := ww.workloads.List() + infos, err := ww.podManager.List() if err != nil { return nil, err } @@ -168,7 +136,7 @@ func (ww *Workload) List() ([]api.WorkloadInfo, error) { } func (ww *Workload) Logs(podID string, res io.Writer) (context.CancelFunc, error) { - return ww.workloads.Logs(podID, res) + return ww.podManager.Logs(podID, res) } func (ww *Workload) Remove(workloadName string) error { @@ -182,7 +150,7 @@ func (ww *Workload) Remove(workloadName string) error { return err } - if err := ww.workloads.Remove(id); err != nil { + if err := ww.podManager.Remove(id); err != nil { return err } if err := ww.netfilter.DeleteChain(nfTableName, workloadName); err != nil { @@ -199,7 +167,7 @@ func (ww *Workload) Stop(workloadName string) error { if id == "" { id = workloadName } - err := ww.workloads.Stop(id) + err := ww.podManager.Stop(id) return err } @@ -234,7 +202,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri if err := ww.applyNetworkConfiguration(workload); err != nil { return err } - podIds, err := ww.workloads.Run(manifestPath, authFilePath, podmanAnnotations) + podIds, err := ww.podManager.Run(manifestPath, authFilePath, podmanAnnotations) if err != nil { return err } @@ -245,7 +213,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri } // Create the system service to manage the pod: - svc, err := ww.workloads.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) + svc, err := ww.podManager.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) if err != nil { return fmt.Errorf("error while generating systemd service: %v", err) } @@ -255,7 +223,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri if err != nil { return fmt.Errorf("error while starting service: %v", err) } - log.Infof("Adding service for listener for %s", workload.Name) + log.Infof("Registering service %s", workload.Name) err = ww.serviceManager.Add(svc) if err != nil { return fmt.Errorf("error while updating service manager: %v", err) @@ -270,7 +238,10 @@ func (ww *Workload) removeService(workloadName string) error { } // Ignore stop failure: - _ = svc.Stop() + err := svc.Stop() + if err != nil { + return fmt.Errorf("unable to stop service %s:%s", workloadName, err) + } // Disable the service from the system: if err := svc.Disable(); err != nil { @@ -282,7 +253,7 @@ func (ww *Workload) removeService(workloadName string) error { return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) } - err := ww.serviceManager.Remove(svc) + err = ww.serviceManager.Remove(svc) if err != nil { log.Errorf("Unable to remove service from serviceManager %s:%s", workloadName, err) } @@ -338,7 +309,7 @@ func (ww *Workload) Start(workload *v1.Pod) error { } podId := ww.mappingRepository.GetId(workload.Name) - if err := ww.workloads.Start(podId); err != nil { + if err := ww.podManager.Start(podId); err != nil { return err } return nil @@ -349,19 +320,19 @@ func (ww *Workload) PersistConfiguration() error { } func (ww *Workload) ListSecrets() (map[string]struct{}, error) { - return ww.workloads.ListSecrets() + return ww.podManager.ListSecrets() } func (ww *Workload) RemoveSecret(name string) error { - return ww.workloads.RemoveSecret(name) + return ww.podManager.RemoveSecret(name) } func (ww *Workload) CreateSecret(name, data string) error { - return ww.workloads.CreateSecret(name, data) + return ww.podManager.CreateSecret(name, data) } func (ww *Workload) UpdateSecret(name, data string) error { - return ww.workloads.UpdateSecret(name, data) + return ww.podManager.UpdateSecret(name, data) } func getHostPorts(workload *v1.Pod) ([]int32, error) { @@ -377,3 +348,35 @@ func getHostPorts(workload *v1.Pod) ([]int32, error) { } return hostPorts, nil } + +func (w *Workload) ListenServiceEvents() { + log.Debug("Starting routine to listen for service events") + for { + event := <-w.systemdEventCh + log.Debugf("Event received: %s", string(event.Type)) + w.lock.Lock() + observers := w.observers + w.lock.Unlock() + log.Debugf("Number of observers %d: %+v", len(observers), observers) + switch event.Type { + case service.EventStarted: + log.Infof("Service for workload %s started", event.WorkloadName) + report, err := w.podManager.GetPodReportForPodName(event.WorkloadName) + if err != nil { + log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) + } + for _, observer := range observers { + log.Debugf("Triggering WorkloadStarted in observer '%s' for workload %s", observer, event.WorkloadName) + observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) + } + case service.EventStopped: + log.Infof("Service for workload %s stopped", event.WorkloadName) + for _, observer := range observers { + log.Debugf("Triggering WorkloadRemoved in observer '%s' for workload %s", observer, event.WorkloadName) + observer.WorkloadRemoved(event.WorkloadName) + } + default: + log.Errorf("Unknown event %s", event.Type) + } + } +} diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index 2db237f0..1cfa60e8 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -40,7 +40,7 @@ var _ = Describe("Workload management", func() { serviceManager = service.NewMockSystemdManager(mockCtrl) svc = service.NewMockService(mockCtrl) - wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15) + wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15, nil) }) AfterEach(func() { diff --git a/vendor/modules.txt b/vendor/modules.txt index 4c37c6b3..b7fd2703 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -390,6 +390,7 @@ github.com/docker/go-units # github.com/evanphx/json-patch v4.12.0+incompatible github.com/evanphx/json-patch # github.com/fsnotify/fsnotify v1.5.4 +## explicit github.com/fsnotify/fsnotify # github.com/gabriel-vasile/mimetype v1.4.0 github.com/gabriel-vasile/mimetype From 7ed1508bf7fbe35e8a02412730706e740f5ebae0 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Wed, 3 Aug 2022 16:36:28 -0400 Subject: [PATCH 08/13] Fix svc not being deleted from services.json --- internal/service/systemd.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/internal/service/systemd.go b/internal/service/systemd.go index bc65cfcc..67453631 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -104,11 +104,7 @@ func (mgr *systemdManager) Add(svc Service) error { defer mgr.lock.Unlock() mgr.services[svc.GetName()] = svc - err := mgr.write() - if err != nil { - return err - } - return nil + return mgr.write() } func (mgr *systemdManager) Get(name string) Service { @@ -121,12 +117,9 @@ func (mgr *systemdManager) Get(name string) Service { func (mgr *systemdManager) Remove(svc Service) error { mgr.lock.Lock() defer mgr.lock.Unlock() - err := mgr.write() - if err != nil { - return err - } + delete(mgr.services, svc.GetName()) - return nil + return mgr.write() } func (mgr *systemdManager) write() error { From 882b1861b2be6a326f274deb4b1dfbd7daee887d Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Wed, 3 Aug 2022 16:50:27 -0400 Subject: [PATCH 09/13] Added Stringer() interface to observers so that they list correctly in debug --- internal/configuration/configuration.go | 1 + internal/configuration/configuration_mock.go | 14 ++++++++++++++ internal/logs/workload.go | 4 ++++ internal/metrics/remote_write.go | 4 ++++ internal/metrics/workload.go | 4 ++++ internal/mount/mount.go | 4 ++++ internal/os/os.go | 4 ++++ 7 files changed, 35 insertions(+) diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index a0ed48ce..c792f19d 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -34,6 +34,7 @@ var ( type Observer interface { Init(configuration models.DeviceConfigurationMessage) error Update(configuration models.DeviceConfigurationMessage) error + String() string } type Manager struct { diff --git a/internal/configuration/configuration_mock.go b/internal/configuration/configuration_mock.go index 6bb849e9..987e371e 100644 --- a/internal/configuration/configuration_mock.go +++ b/internal/configuration/configuration_mock.go @@ -48,6 +48,20 @@ func (mr *MockObserverMockRecorder) Init(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockObserver)(nil).Init), arg0) } +// String mocks base method. +func (m *MockObserver) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockObserverMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockObserver)(nil).String)) +} + // Update mocks base method. func (m *MockObserver) Update(arg0 models.DeviceConfigurationMessage) error { m.ctrl.T.Helper() diff --git a/internal/logs/workload.go b/internal/logs/workload.go index f343877b..101e01ed 100644 --- a/internal/logs/workload.go +++ b/internal/logs/workload.go @@ -205,3 +205,7 @@ func (w *WorkloadsLogsTarget) WorkloadStarted(workloadName string, report []*pod log.Error("Cannot start workload logs", err) } } + +func (w *WorkloadsLogsTarget) String() string { + return "workload logs" +} diff --git a/internal/metrics/remote_write.go b/internal/metrics/remote_write.go index bf088118..885d0357 100644 --- a/internal/metrics/remote_write.go +++ b/internal/metrics/remote_write.go @@ -532,3 +532,7 @@ func toTimeSeries(series []Series) (timeSeries []prompb.TimeSeries, lowest time. highest = fromDbTime(maxt) return } + +func (r *RemoteWrite) String() string { + return "remote write" +} diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index b6be7a79..d1bb67df 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -84,6 +84,10 @@ func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podm } } +func (wrKM *WorkloadMetrics) String() string { + return "workload metrics" +} + func getWorkloadUrls(report *podman.PodReport, config *models.Workload) []string { res := []string{} metricsPath := config.Metrics.Path diff --git a/internal/mount/mount.go b/internal/mount/mount.go index 6366141e..6bc11ac2 100644 --- a/internal/mount/mount.go +++ b/internal/mount/mount.go @@ -170,3 +170,7 @@ func getDefaultMountOptions() string { } return fmt.Sprintf("gid=%s,uid=%s", group.Gid, usr.Uid) } + +func (m *Manager) String() string { + return "mount" +} diff --git a/internal/os/os.go b/internal/os/os.go index a36373ae..7009d319 100644 --- a/internal/os/os.go +++ b/internal/os/os.go @@ -207,3 +207,7 @@ func (o *OS) updateGreenbootScripts() error { return nil } + +func (o *OS) String() string { + return "rpm-ostree" +} From c9929b9aa9b4b72b125ad074abecc6cad2150e17 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Wed, 3 Aug 2022 21:49:58 -0400 Subject: [PATCH 10/13] Process the error from stopping the service --- internal/workload/wrapper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 2d7510d4..0919de14 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -245,12 +245,12 @@ func (ww *Workload) removeService(workloadName string) error { // Disable the service from the system: if err := svc.Disable(); err != nil { - return fmt.Errorf("Cannot disable systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to disable systemd service for '%s': %s", workloadName, err) } // Remove the service from the system: if err := svc.Remove(); err != nil { - return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to remove systemd service for '%s': %s", workloadName, err) } err = ww.serviceManager.Remove(svc) From 6b04ac2ebe584b1a1cf5dee23a8bdc6e3a491f93 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Thu, 4 Aug 2022 08:47:10 -0400 Subject: [PATCH 11/13] Remove service from systemd watch filter when dbus remove event is received to avoid potential race condition between file system event and dbus event --- internal/service/event_listener.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 3b90cf27..199a1de7 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -136,7 +136,12 @@ func (e *DBusEventListener) Listen() { case running: log.Debugf("Sending start event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} - case removed, stop, dead, failed, start: + case removed: + // We remove the service from the filter here to avoid a potential race condition where the fs notify routine removes it before the systemd event is received. + log.Infof("Service %s disabled or removed", name) + e.Remove(name) + fallthrough + case stop, dead, failed, start: log.Debugf("Sending stop event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} default: @@ -165,11 +170,10 @@ func (e *DBusEventListener) watchFSEvents() { svcName := getServiceFileName(event.Name) switch { case event.Op&fsnotify.Create == fsnotify.Create: - log.Infof("New service configuration detected at %s", event.Name) + log.Infof("New systemd service unit file %s detected", event.Name) e.Add(svcName) case event.Op&fsnotify.Remove == fsnotify.Remove: - log.Infof("Service configuration removed at %s", event.Name) - e.Remove(svcName) + log.Debugf("Systemd service unit file %s has been removed. Waiting for the systemd dbus remove event to elimitate it from the service watch filter", event.Name) } case err, ok := <-e.fsWatcher.Errors: if !ok { From d263eb7c669498d99604e5d8545c19e4a97f501b Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Mon, 8 Aug 2022 15:16:52 -0400 Subject: [PATCH 12/13] Use workload from device config as source of truth for dbus service event filtering --- cmd/device-worker/main.go | 26 ++-- internal/configuration/configuration.go | 3 +- internal/service/event_listener.go | 172 +++++++++--------------- 3 files changed, 76 insertions(+), 125 deletions(-) diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index 1a73394c..a31930e1 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -34,17 +34,21 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var yggdDispatchSocketAddr string -var gracefulRebootChannel chan struct{} - const ( - defaultDataDir = "/var/local/yggdrasil" + defaultDataDir = "/var/local/yggdrasil" + systemdUserServicesDir = ".config/systemd/user" +) + +var ( + yggdDispatchSocketAddr string + gracefulRebootChannel chan struct{} + systemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) ) func initSystemdDirectory() error { // init the flotta user systemd units directory - err := os.MkdirAll(service.SystemdUserServicesFullPath, 0750) + err := os.MkdirAll(systemdUserServicesFullPath, 0750) if err != nil { return err } @@ -150,15 +154,11 @@ func main() { } configManager := configuration2.NewConfigurationManager(dataDir) - // Systemd event listener - listener := service.NewDBusEventListener() - eventCh, err := listener.Connect() - if err != nil { - log.Fatal(err) - } + // Systemd event dbusEventListener + dbusEventListener := service.NewDBusEventListener() // Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise // we risk in missing observers whilst processing the events. - go listener.Listen() + configManager.RegisterObserver(dbusEventListener) // --- Client metrics configuration --- metricsStore, err := metrics.NewTSDB(dataDir) @@ -181,7 +181,7 @@ func main() { dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon) configManager.RegisterObserver(dataTransferWatcher) - workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, eventCh) + workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, dbusEventListener.GetEventChannel()) if err != nil { log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err) } diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index c792f19d..4beaf158 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io/ioutil" "os" "path" "reflect" @@ -49,10 +50,10 @@ type Manager struct { func NewConfigurationManager(dataDir string) *Manager { deviceConfigFile := path.Join(dataDir, "device-config.json") log.Infof("device config file: %s", deviceConfigFile) - file, err := os.ReadFile(deviceConfigFile) //#nosec var deviceConfiguration models.DeviceConfigurationMessage initialConfig := atomic.Value{} initialConfig.Store(false) + file, err := ioutil.ReadFile(deviceConfigFile) //#nosec if err != nil { log.Error(err) deviceConfiguration = defaultDeviceConfigurationMessage diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 199a1de7..3f9a1037 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -1,14 +1,15 @@ package service import ( - "errors" + "context" "fmt" - "os" "path/filepath" + "strconv" "strings" + "sync" "github.com/coreos/go-systemd/v22/dbus" - "github.com/fsnotify/fsnotify" + "github.com/project-flotta/flotta-operator/models" log "github.com/sirupsen/logrus" ) @@ -16,9 +17,6 @@ const ( EventStarted EventType = "started" EventStopped EventType = "stopped" - systemdUserServicesDir = ".config/systemd/user" - defaultTargetWantsRelativePath = "default.target.wants/" - // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent // The only addition to the list is the "removed" case, which we use when a service is removed // from the system and we receive an empty unitStatus object. @@ -38,11 +36,6 @@ const ( removed unitSubState = "removed" ) -var ( - SystemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) - enabledServicesFullPath = filepath.Join(SystemdUserServicesFullPath, defaultTargetWantsRelativePath) -) - type unitSubState string type EventType string @@ -53,69 +46,59 @@ type Event struct { } type DBusEventListener struct { - observerCh chan *Event - set *dbus.SubscriptionSet - dbusCh <-chan map[string]*dbus.UnitStatus - dbusErrCh <-chan error - fsWatcher *fsnotify.Watcher + eventCh chan *Event + set *dbus.SubscriptionSet + dbusCh <-chan map[string]*dbus.UnitStatus + dbusErrCh <-chan error + lock sync.Mutex } func NewDBusEventListener() *DBusEventListener { - return &DBusEventListener{} + return &DBusEventListener{lock: sync.Mutex{}, eventCh: make(chan *Event, 1000)} } -func (e *DBusEventListener) Connect() (<-chan *Event, error) { +func (e *DBusEventListener) Init(configuration models.DeviceConfigurationMessage) error { + log.Infof("Starting DBus event listener") conn, err := newDbusConnection(UserBus) if err != nil { - return nil, err + return err } e.set = conn.NewSubscriptionSet() - if err := e.initializeSubscriptionSet(); err != nil { - return nil, err - } - e.observerCh = make(chan *Event, 1000) - return e.observerCh, nil -} - -func (e *DBusEventListener) initializeSubscriptionSet() error { - _, err := os.Stat(enabledServicesFullPath) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return e.watchServiceDirectory() + for _, w := range configuration.Workloads { + s, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(w.Name), "UnitFileState") + if err != nil { + return err } - return err - } - files, err := os.ReadDir(enabledServicesFullPath) - if err != nil { - return err - } - log.Infof("List of services to be monitored in %s:%s", enabledServicesFullPath, files) - for _, fd := range files { - f := filepath.Join(enabledServicesFullPath, fd.Name()) - log.Debugf("Detected service file %s", f) - if !fileExist(f) { - log.Errorf("Hard link %s does not exist or broken link", fd.Name()) - continue + log.Debugf("Unit UnitFileState property for workload %s:%s", w.Name, s.Value.String()) + v, err := strconv.Unquote(s.Value.String()) + if err != nil { + return err + } + if v == "disabled" { + log.Warnf("Service for workload %s is disabled", w.Name) } - e.Add(getServiceFileName(fd.Name())) + e.add(DefaultServiceName(w.Name)) } - return e.watchServiceDirectory() + go e.Listen() + return nil } -func (e *DBusEventListener) watchServiceDirectory() error { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return err +func (e *DBusEventListener) Update(configuration models.DeviceConfigurationMessage) error { + for _, wl := range configuration.Workloads { + svcName := DefaultServiceName(wl.Name) + if !e.contains(svcName) { + e.add(svcName) + } } - e.fsWatcher = watcher + return nil +} - _, err = os.Stat(SystemdUserServicesFullPath) - if err != nil { - return fmt.Errorf("systemd user directory not found: %s", err) - } +func (e *DBusEventListener) String() string { + return "DBus event listener" +} - go e.watchFSEvents() - return e.fsWatcher.Add(SystemdUserServicesFullPath) +func (e *DBusEventListener) GetEventChannel() <-chan *Event { + return e.eventCh } func (e *DBusEventListener) Listen() { @@ -135,15 +118,14 @@ func (e *DBusEventListener) Listen() { switch state { case running: log.Debugf("Sending start event to observer channel for workload %s", n) - e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} + e.eventCh <- &Event{WorkloadName: n, Type: EventStarted} case removed: - // We remove the service from the filter here to avoid a potential race condition where the fs notify routine removes it before the systemd event is received. - log.Infof("Service %s disabled or removed", name) - e.Remove(name) + log.Debugf("Service %s has been removed", name) + e.remove(name) fallthrough case stop, dead, failed, start: log.Debugf("Sending stop event to observer channel for workload %s", n) - e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} + e.eventCh <- &Event{WorkloadName: n, Type: EventStopped} default: log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) } @@ -155,52 +137,26 @@ func (e *DBusEventListener) Listen() { } -func (e *DBusEventListener) watchFSEvents() { - for { - select { - case event, ok := <-e.fsWatcher.Events: - if !ok { - log.Errorf("Error while watching for file system events at %s:%s", SystemdUserServicesFullPath, event) - continue - } - log.Debugf("Captured file system event:%s", event) - if !isAnEnabledService(event.Name) { - continue - } - svcName := getServiceFileName(event.Name) - switch { - case event.Op&fsnotify.Create == fsnotify.Create: - log.Infof("New systemd service unit file %s detected", event.Name) - e.Add(svcName) - case event.Op&fsnotify.Remove == fsnotify.Remove: - log.Debugf("Systemd service unit file %s has been removed. Waiting for the systemd dbus remove event to elimitate it from the service watch filter", event.Name) - } - case err, ok := <-e.fsWatcher.Errors: - if !ok { - log.Errorf("Error detected on file system watcher: %s", err) - return - } - } - } -} - -func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { - if unit == nil { - return removed - } - return unitSubState(unit.SubState) -} - -func (e *DBusEventListener) Add(serviceName string) { +func (e *DBusEventListener) add(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() log.Debugf("Adding service for event listener %s", serviceName) e.set.Add(serviceName) } -func (e *DBusEventListener) Remove(serviceName string) { +func (e *DBusEventListener) remove(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() log.Debugf("Removing service from event listener %s", serviceName) e.set.Remove(serviceName) } +func (e *DBusEventListener) contains(serviceName string) bool { + e.lock.Lock() + defer e.lock.Unlock() + return e.set.Contains(serviceName) +} + func extractWorkloadName(serviceName string) (string, error) { if filepath.Ext(serviceName) != ServiceSuffix { return "", fmt.Errorf("invalid file name or not a service %s", serviceName) @@ -208,15 +164,9 @@ func extractWorkloadName(serviceName string) (string, error) { return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil } -func fileExist(path string) bool { - _, err := os.Stat(path) - return err == nil -} - -func isAnEnabledService(fullPath string) bool { - return filepath.Ext(fullPath) == ServiceSuffix && filepath.Dir(fullPath) == enabledServicesFullPath -} -func getServiceFileName(fullPath string) string { - _, filename := filepath.Split(fullPath) - return filename +func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { + if unit == nil { + return removed + } + return unitSubState(unit.SubState) } From 13b2b9e0565d18f49471bfc232aa21ab665bc331 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Fri, 23 Sep 2022 10:27:19 -0400 Subject: [PATCH 13/13] Added more error verbosity --- cmd/device-worker/main.go | 2 +- internal/configuration/configuration.go | 3 +-- internal/service/event_listener.go | 10 ++++++---- internal/service/systemd.go | 9 ++++----- internal/workload/podman/podman.go | 2 +- internal/workload/wrapper.go | 8 ++++---- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index a31930e1..4332b637 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -21,7 +21,7 @@ import ( registration2 "github.com/project-flotta/flotta-device-worker/internal/registration" "github.com/project-flotta/flotta-device-worker/internal/server" "github.com/project-flotta/flotta-device-worker/internal/service" - workload "github.com/project-flotta/flotta-device-worker/internal/workload" + "github.com/project-flotta/flotta-device-worker/internal/workload" "net" "os" diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index 4beaf158..747e8bd2 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" "os" "path" "reflect" @@ -53,7 +52,7 @@ func NewConfigurationManager(dataDir string) *Manager { var deviceConfiguration models.DeviceConfigurationMessage initialConfig := atomic.Value{} initialConfig.Store(false) - file, err := ioutil.ReadFile(deviceConfigFile) //#nosec + file, err := os.ReadFile(deviceConfigFile) //#nosec if err != nil { log.Error(err) deviceConfiguration = defaultDeviceConfigurationMessage diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 3f9a1037..92467833 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -61,21 +61,21 @@ func (e *DBusEventListener) Init(configuration models.DeviceConfigurationMessage log.Infof("Starting DBus event listener") conn, err := newDbusConnection(UserBus) if err != nil { - return err + return fmt.Errorf("error while starting event listener: %v", err) } e.set = conn.NewSubscriptionSet() for _, w := range configuration.Workloads { s, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(w.Name), "UnitFileState") if err != nil { - return err + return fmt.Errorf("error while retrieving workload '%s' state: %v", w.Name, err) } - log.Debugf("Unit UnitFileState property for workload %s:%s", w.Name, s.Value.String()) + log.Debugf("Unit UnitFileState property for workload '%s':%s", w.Name, s.Value.String()) v, err := strconv.Unquote(s.Value.String()) if err != nil { return err } if v == "disabled" { - log.Warnf("Service for workload %s is disabled", w.Name) + log.Warnf("Service for workload '%s' is disabled", w.Name) } e.add(DefaultServiceName(w.Name)) } @@ -88,6 +88,8 @@ func (e *DBusEventListener) Update(configuration models.DeviceConfigurationMessa svcName := DefaultServiceName(wl.Name) if !e.contains(svcName) { e.add(svcName) + } else { + return fmt.Errorf("unable to add systemd service '%s': there is service unit already being monitored with the same name", svcName) } } return nil diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 67453631..683df623 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "os" "path" "path/filepath" @@ -127,7 +126,7 @@ func (mgr *systemdManager) write() error { if err != nil { return err } - return ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec + return os.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } func newDbusConnection(busType BusType) (*dbus.Conn, error) { @@ -221,7 +220,7 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { - log.Debugf("Starting service %s", s.Name) + log.Debugf("Starting systemd service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err @@ -262,7 +261,7 @@ func (s *systemd) Stop() error { } func (s *systemd) Enable() error { - log.Debugf("Enabling service %s", s.Name) + log.Debugf("Enabling systemd service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err @@ -274,7 +273,7 @@ func (s *systemd) Enable() error { } func (s *systemd) Disable() error { - log.Debugf("Disabling service %s", s.Name) + log.Debugf("Disabling systemd service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index 1f6b3867..99d93392 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -15,7 +15,7 @@ import ( "github.com/blang/semver" "github.com/go-openapi/swag" "github.com/project-flotta/flotta-device-worker/internal/service" - api "github.com/project-flotta/flotta-device-worker/internal/workload/api" + "github.com/project-flotta/flotta-device-worker/internal/workload/api" "github.com/containers/podman/v4/pkg/bindings" "github.com/containers/podman/v4/pkg/bindings/containers" diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 0919de14..b4bd2357 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -363,16 +363,16 @@ func (w *Workload) ListenServiceEvents() { log.Infof("Service for workload %s started", event.WorkloadName) report, err := w.podManager.GetPodReportForPodName(event.WorkloadName) if err != nil { - log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) + log.Errorf("unable to get pod report for workload '%s':%v", event.WorkloadName, err) } for _, observer := range observers { - log.Debugf("Triggering WorkloadStarted in observer '%s' for workload %s", observer, event.WorkloadName) + log.Debugf("Triggering WorkloadStarted in observer '%s' for workload '%s'", observer, event.WorkloadName) observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) } case service.EventStopped: - log.Infof("Service for workload %s stopped", event.WorkloadName) + log.Infof("Service for workload '%s' stopped", event.WorkloadName) for _, observer := range observers { - log.Debugf("Triggering WorkloadRemoved in observer '%s' for workload %s", observer, event.WorkloadName) + log.Debugf("Triggering WorkloadRemoved in observer '%s' for workload '%s'", observer, event.WorkloadName) observer.WorkloadRemoved(event.WorkloadName) } default: