From 858980f6c573057d8026bcfb1d4f571cd24e9c69 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Fri, 22 Jul 2022 12:08:20 -0400 Subject: [PATCH 01/13] Moving to systemd events to detect changes in service status --- internal/metrics/system.go | 2 +- internal/metrics/workload.go | 2 +- internal/service/event_listener.go | 116 ++++++++++++++++++++++++ internal/service/systemd.go | 55 +++++++---- internal/workload/manager.go | 2 +- internal/workload/podman/mock_podman.go | 12 +-- internal/workload/podman/podman.go | 93 +++---------------- internal/workload/wrapper.go | 62 ++++++------- internal/workload/wrapper_test.go | 1 - 9 files changed, 199 insertions(+), 146 deletions(-) create mode 100644 internal/service/event_listener.go diff --git a/internal/metrics/system.go b/internal/metrics/system.go index f86b3257..44c3c817 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false) + nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false, nil) if err != nil { return nil, err } diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index 287421f8..3d7ca822 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -48,7 +48,7 @@ func (wrkM *WorkloadMetrics) Update(config models.DeviceConfigurationMessage) er } func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) { - log.Infof("removing target metrics for workload '%v'", workloadName) + log.Infof("Removing target metrics for workload '%v'", workloadName) wrkM.daemon.DeleteTarget(workloadName) } diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go new file mode 100644 index 00000000..057062b3 --- /dev/null +++ b/internal/service/event_listener.go @@ -0,0 +1,116 @@ +package service + +import ( + "fmt" + + "git.sr.ht/~spc/go-log" + "github.com/coreos/go-systemd/v22/dbus" +) + +const ( + // servicePostfixLength is the length of the string ".service". This value is used to extract the workload name from the service + // coming from systemd + servicePostfixLength = 8 + + EventStarted EventType = "started" + EventStopped EventType = "stopped" + + // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent + // The only addition to the list is the "removed" case, which we use when a service is removed + // from the system and we receive an empty unitStatus object. + + //The service is running + running unitSubState = "running" + //The service is starting, probably due to a restart. + //In this case we have to notify the observers that the service is not yet up and is loading. A subsequent 'running' event will proceed. + start unitSubState = "start" + //The service has been stopped, due to the container being killed. Systemd will restart the service. + stop unitSubState = "stop" + //The service is dead, due to the container being stopped for instance. + dead unitSubState = "dead" + //The service failed to start + failed unitSubState = "failed" + //The service has been removed, no unit state is returned from systemd + removed unitSubState = "removed" +) + +type unitSubState string + +type EventType string + +type Event struct { + WorkloadName string + Type EventType +} + +type EventListener struct { + observerCh chan *Event + set *dbus.SubscriptionSet + dbusCh <-chan map[string]*dbus.UnitStatus + dbusErrCh <-chan error +} + +func NewEventListener(observerCh chan *Event) *EventListener { + return &EventListener{observerCh: observerCh} +} + +func (e *EventListener) Connect() error { + conn, err := newDbusConnection(true) + if err != nil { + return err + } + e.set = conn.NewSubscriptionSet() + return nil +} + +func (e *EventListener) Listen() { + e.dbusCh, e.dbusErrCh = e.set.Subscribe() + go func() { + for { + select { + case msg := <-e.dbusCh: + for name, unit := range msg { + log.Debugf("Captured event for %s: %v+", name, unit) + n := extractWorkloadName(name) + state := translateUnitSubStatus(unit) + log.Infof("Service %s for workload transitioned to sub state %s\n", n, state) + switch state { + case running: + e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} + case removed, stop, dead, failed, start: + e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} + default: + log.Infof("Ignoring unit sub state for service %s: %s", name, unit.SubState) + } + } + case msg := <-e.dbusErrCh: + log.Errorf("Error while parsing dbus event: %v", msg) + } + } + }() +} + +func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { + if unit == nil { + return removed + } + return unitSubState(unit.SubState) +} + +func (e *EventListener) Add(workloadName string) { + name := fmt.Sprintf("%s.service", workloadName) + log.Debugf("Adding service for events %s", name) + if !e.set.Contains(name) { + e.set.Add(name) + } +} + +func (e *EventListener) Remove(workloadName string) { + name := fmt.Sprintf("%s.service", workloadName) + log.Debugf("Removing service for events %s", name) + e.set.Remove(name) +} + +func extractWorkloadName(serviceName string) string { + return serviceName[:len(serviceName)-servicePostfixLength] +} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index aa672f6e..26d969ed 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -20,6 +20,7 @@ const ( DefaultRestartTimeout = 15 TimerSuffix = ".timer" ServiceSuffix = ".service" + DefaultNameSeparator = "-" ) var ( @@ -44,6 +45,7 @@ type systemd struct { UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` Rootless bool `json:"rootless"` + eventCh chan Event `json:"-"` } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -55,12 +57,13 @@ type SystemdManager interface { } type systemdManager struct { - svcFilePath string - lock sync.RWMutex - services map[string]Service + svcFilePath string + lock sync.RWMutex + services map[string]Service + eventListener *EventListener } -func NewSystemdManager(configDir string) (SystemdManager, error) { +func NewSystemdManager(configDir string, observerCh chan *Event) (SystemdManager, error) { services := make(map[string]*systemd) servicePath := path.Join(configDir, "services.json") servicesJson, err := ioutil.ReadFile(servicePath) //#nosec @@ -75,8 +78,16 @@ func NewSystemdManager(configDir string) (SystemdManager, error) { for k, v := range services { systemdSVC[k] = v } - - return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}}, nil + listener := NewEventListener(observerCh) + err = listener.Connect() + if err != nil { + return nil, err + } + for name := range services { + listener.Add(name) + } + listener.Listen() + return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}, eventListener: listener}, nil } func (mgr *systemdManager) RemoveServicesFile() error { @@ -98,8 +109,12 @@ func (mgr *systemdManager) Add(svc Service) error { defer mgr.lock.Unlock() mgr.services[svc.GetName()] = svc - - return mgr.write() + err := mgr.write() + if err != nil { + return err + } + mgr.eventListener.Add(svc.GetName()) + return nil } func (mgr *systemdManager) Get(name string) Service { @@ -112,10 +127,13 @@ func (mgr *systemdManager) Get(name string) Service { func (mgr *systemdManager) Remove(svc Service) error { mgr.lock.Lock() defer mgr.lock.Unlock() - + err := mgr.write() + if err != nil { + return err + } delete(mgr.services, svc.GetName()) - - return mgr.write() + mgr.eventListener.Remove(svc.GetName()) + return nil } func (mgr *systemdManager) write() error { @@ -123,15 +141,11 @@ func (mgr *systemdManager) write() error { if err != nil { return err } - err = ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec - if err != nil { - return err - } - return nil + return ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } -func NewSystemd(name string, units map[string]string) (Service, error) { - return NewSystemdRootless(name, units, true) +func NewSystemd(name string, units map[string]string, eventCh chan Event) (Service, error) { + return NewSystemdRootless(name, units, true, eventCh) } func newDbusConnection(rootless bool) (*dbus.Conn, error) { @@ -162,7 +176,7 @@ func newDbusConnection(rootless bool) (*dbus.Conn, error) { } } -func NewSystemdRootless(name string, units map[string]string, rootless bool) (Service, error) { +func NewSystemdRootless(name string, units map[string]string, rootless bool, eventCh chan Event) (Service, error) { var err error var conn *dbus.Conn @@ -183,6 +197,7 @@ func NewSystemdRootless(name string, units map[string]string, rootless bool) (Se Units: unitNames, Rootless: rootless, UnitsContent: units, + eventCh: eventCh, }, nil } @@ -209,7 +224,7 @@ func (s *systemd) Remove() error { return err } } - + s.eventCh <- Event{WorkloadName: s.Name, Type: EventStopped} return s.reload() } diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 31e6a4bb..489876fd 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -147,7 +147,7 @@ func (w *WorkloadManager) Update(configuration models.DeviceConfigurationMessage if PodShouldWaitForMount(pod, configuration.Configuration) { errors = multierror.Append(errors, fmt.Errorf( - "Pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) + "pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) continue } diff --git a/internal/workload/podman/mock_podman.go b/internal/workload/podman/mock_podman.go index 692072ce..70621aa9 100644 --- a/internal/workload/podman/mock_podman.go +++ b/internal/workload/podman/mock_podman.go @@ -82,19 +82,19 @@ func (mr *MockPodmanMockRecorder) GenerateSystemdService(arg0, arg1, arg2 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateSystemdService", reflect.TypeOf((*MockPodman)(nil).GenerateSystemdService), arg0, arg1, arg2) } -// GetPodReportForId mocks base method. -func (m *MockPodman) GetPodReportForId(arg0 string) (*PodReport, error) { +// GetPodReportForPodName mocks base method. +func (m *MockPodman) GetPodReportForPodName(arg0 string) (*PodReport, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPodReportForId", arg0) + ret := m.ctrl.Call(m, "GetPodReportForPodName", arg0) ret0, _ := ret[0].(*PodReport) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetPodReportForId indicates an expected call of GetPodReportForId. -func (mr *MockPodmanMockRecorder) GetPodReportForId(arg0 interface{}) *gomock.Call { +// GetPodReportForPodName indicates an expected call of GetPodReportForPodName. +func (mr *MockPodmanMockRecorder) GetPodReportForPodName(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForId", reflect.TypeOf((*MockPodman)(nil).GetPodReportForId), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForPodName", reflect.TypeOf((*MockPodman)(nil).GetPodReportForPodName), arg0) } // List mocks base method. diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index bd1dc4a4..a5eb9bd9 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -15,9 +15,9 @@ import ( "github.com/blang/semver" "github.com/go-openapi/swag" "github.com/project-flotta/flotta-device-worker/internal/service" + api "github.com/project-flotta/flotta-device-worker/internal/workload/api" "git.sr.ht/~spc/go-log" - podmanEvents "github.com/containers/podman/v4/libpod/events" "github.com/containers/podman/v4/pkg/bindings" "github.com/containers/podman/v4/pkg/bindings/containers" "github.com/containers/podman/v4/pkg/bindings/generate" @@ -26,7 +26,6 @@ import ( "github.com/containers/podman/v4/pkg/bindings/secrets" "github.com/containers/podman/v4/pkg/bindings/system" "github.com/containers/podman/v4/pkg/domain/entities" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" v1 "k8s.io/api/core/v1" ) @@ -36,10 +35,6 @@ const ( DefaultNetworkName = "podman" - podmanStart = string(podmanEvents.Start) - podmanRemove = string(podmanEvents.Remove) - podmanStop = string(podmanEvents.Stop) - podmanBinary = "/usr/bin/podman" autoUpdateServiceUnitTemplate = `[Unit] Description=Podman {{ .PodName }}.service @@ -72,7 +67,7 @@ type AutoUpdateUnit struct { //go:generate mockgen -package=podman -destination=mock_podman.go . Podman type Podman interface { - List() ([]api2.WorkloadInfo, error) + List() ([]api.WorkloadInfo, error) Remove(workloadId string) error Run(manifestPath, authFilePath string, annotations map[string]string) ([]*PodReport, error) Start(workloadId string) error @@ -84,7 +79,7 @@ type Podman interface { Exists(workloadId string) (bool, error) GenerateSystemdService(workload *v1.Pod, manifestPath string, monitoringInterval uint) (service.Service, error) Logs(podID string, res io.Writer) (context.CancelFunc, error) - GetPodReportForId(podID string) (*PodReport, error) + GetPodReportForPodName(podName string) (*PodReport, error) } type PodmanEvent struct { @@ -114,6 +109,8 @@ const DefaultTimeoutForStoppingInSeconds int = 5 type podman struct { podmanConnection context.Context timeoutForStopping int + eventCh chan service.Event + cancel chan bool } func NewPodman() (*podman, error) { @@ -124,6 +121,8 @@ func NewPodman() (*podman, error) { p := &podman{ podmanConnection: podmanConnection, timeoutForStopping: DefaultTimeoutForStoppingInSeconds, + eventCh: make(chan service.Event, 1000), + cancel: make(chan bool), } err = p.MinVersion() if err != nil { @@ -157,14 +156,14 @@ func (p *podman) MinVersion() error { return fmt.Errorf("podman version '%s' is not supported, needs >= v4.2", version.Version.Version) } -func (p *podman) List() ([]api2.WorkloadInfo, error) { +func (p *podman) List() ([]api.WorkloadInfo, error) { podList, err := pods.List(p.podmanConnection, nil) if err != nil { return nil, err } - var workloads []api2.WorkloadInfo + var workloads []api.WorkloadInfo for _, pod := range podList { - wi := api2.WorkloadInfo{ + wi := api.WorkloadInfo{ Id: pod.Id, Name: pod.Name, Status: pod.Status, @@ -216,73 +215,7 @@ func (p *podman) getContainerDetails(containerId string) (*ContainerReport, erro }, nil } -func (p *podman) Events(events chan *PodmanEvent) { - evchan := make(chan entities.Event, 1000) - cancel := make(chan bool) - booltrue := true - - workloads, err := p.List() - if err != nil { - log.Errorf("Cannot get the list of running pods: %v", err) - } - - for _, wrk := range workloads { - report, err := p.GetPodReportForId(wrk.Id) - if err != nil { - log.Errorf("Cannot get pod report for pod '%v', err: %v ", wrk.Name, err) - continue - } - event := &PodmanEvent{ - WorkloadName: wrk.Name, - Event: StartedContainer, - Report: report, - } - go func() { events <- event }() - } - - // subroutine that reads the event chan and sending proper messages to the - // wrapper - go func() { - for { - msg := <-evchan - event := &PodmanEvent{ - WorkloadName: msg.Actor.Attributes["name"], - } - switch msg.Action { - // create event is avoided because containers are not yet created and - // the flow is created->started - case podmanStart: - event.Event = StartedContainer - report, err := p.GetPodReportForId(msg.ID) - if err != nil { - log.Error("cannot get current pod information on event: ", err) - continue - } - event.Report = report - case podmanRemove, podmanStop: - event.Event = StoppedContainer - default: - continue - } - events <- event - } - }() - - // subroutine to track events - go func() { - err := system.Events(p.podmanConnection, evchan, cancel, &system.EventsOptions{ - Filters: map[string][]string{ - "type": {"pod"}, - }, - Stream: &booltrue, - }) - if err != nil { - log.Error("Cannot get podman events, err: ", err) - } - }() -} - -func (p *podman) GetPodReportForId(podID string) (*PodReport, error) { +func (p *podman) GetPodReportForPodName(podID string) (*PodReport, error) { podInfo, err := pods.Inspect(p.podmanConnection, podID, nil) if err != nil { return nil, err @@ -408,7 +341,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units) + svc, err = service.NewSystemd(podName, report.Units, p.eventCh) if err != nil { return nil, err } @@ -425,7 +358,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units) + svc, err = service.NewSystemd(podName, units, p.eventCh) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 40a6f6cc..81c4ea30 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -11,7 +11,6 @@ import ( "git.sr.ht/~spc/go-log" "github.com/project-flotta/flotta-device-worker/internal/workload/api" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" @@ -83,7 +82,8 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) } - serviceManager, err := service.NewSystemdManager(configDir) + eventCh := make(chan *service.Event) + serviceManager, err := service.NewSystemdManager(configDir, eventCh) if err != nil { return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) } @@ -97,26 +97,30 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, events: make(chan *podman.PodmanEvent), } - newPodman.Events(ww.events) - go func() { for { - msg := <-ww.events - switch msg.Event { - case podman.StartedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() + event := <-eventCh + log.Debugf("Event received %s", string(event.Type)) + ww.lock.Lock() + observers := ww.observers + ww.lock.Unlock() + switch event.Type { + case service.EventStarted: + log.Infof("Service for workload %s started", event.WorkloadName) + report, err := newPodman.GetPodReportForPodName(event.WorkloadName) + if err != nil { + log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) + } for _, observer := range observers { - observer.WorkloadStarted(msg.WorkloadName, []*podman.PodReport{msg.Report}) + observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) } - case podman.StoppedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() + case service.EventStopped: + log.Infof("Service for workload %s stopped", event.WorkloadName) for _, observer := range observers { - observer.WorkloadRemoved(msg.WorkloadName) + observer.WorkloadRemoved(event.WorkloadName) } + default: + log.Errorf("unknown event %s", event.Type) } } }() @@ -131,7 +135,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemdRootless("podman-auto-update", nil, false) + svc, err := service.NewSystemdRootless("podman-auto-update", nil, false, nil) if err != nil { return err } @@ -143,7 +147,7 @@ func (ww *Workload) Init() error { return ww.netfilter.AddTable(nfTableName) } -func (ww *Workload) List() ([]api2.WorkloadInfo, error) { +func (ww *Workload) List() ([]api.WorkloadInfo, error) { infos, err := ww.workloads.List() if err != nil { return nil, err @@ -240,29 +244,16 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri return fmt.Errorf("error while generating systemd service: %v", err) } + log.Infof("Creating service for %s", workload.Name) err = ww.createService(svc) if err != nil { return fmt.Errorf("error while starting service: %v", err) } + log.Infof("Adding service for listener for %s", workload.Name) err = ww.serviceManager.Add(svc) if err != nil { return fmt.Errorf("error while updating service manager: %v", err) } - - podReport, err := ww.workloads.GetPodReportForId(workload.Name) - if err != nil { - return fmt.Errorf("error while sending started events: %v", err) - } - - // When pod started by systemd the event is not sent to the channel, so we send - // it manually here to notify workloadStarted observer. - go func() { - ww.events <- &podman.PodmanEvent{ - Event: podman.StartedContainer, - WorkloadName: workload.Name, - Report: podReport, - } - }() return nil } @@ -285,11 +276,10 @@ func (ww *Workload) removeService(workloadName string) error { return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) } - err := ww.serviceManager.Remove(svc) + err = ww.serviceManager.Remove(svc) if err != nil { - return nil + log.Errorf("unable to remove service from serviceManager %s:%s", workloadName, err) } - return nil } diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index d22a896f..2db237f0 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -56,7 +56,6 @@ var _ = Describe("Workload management", func() { newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) - newPodman.EXPECT().GetPodReportForId("pod1").Return(nil, nil) svc.EXPECT().Add().Return(nil) svc.EXPECT().Enable().Return(nil) From e23a28130123dfa5a5bb2757bddfb20a9cd6611b Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Thu, 28 Jul 2022 17:34:05 -0400 Subject: [PATCH 02/13] Refactor systemd functions --- internal/metrics/system.go | 2 +- internal/service/event_listener.go | 2 +- internal/service/systemd.go | 33 ++++++++++++++++-------------- internal/workload/podman/podman.go | 4 ++-- internal/workload/wrapper.go | 2 +- 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 44c3c817..36ef75cf 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false, nil) + nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus, nil) if err != nil { return nil, err } diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 057062b3..5b0de69f 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -55,7 +55,7 @@ func NewEventListener(observerCh chan *Event) *EventListener { } func (e *EventListener) Connect() error { - conn, err := newDbusConnection(true) + conn, err := newDbusConnection(UserBus) if err != nil { return err } diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 26d969ed..97e5250c 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -27,6 +27,13 @@ var ( DefaultUnitsPath = path.Join(os.Getenv("HOME"), ".config/systemd/user/") ) +type BusType string + +const ( + UserBus BusType = "user" + SystemBus BusType = "system" +) + //go:generate mockgen -package=service -destination=mock_systemd.go . Service type Service interface { GetName() string @@ -44,7 +51,7 @@ type systemd struct { Units []string `json:"units"` UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` - Rootless bool `json:"rootless"` + BusType BusType `json:"busType"` eventCh chan Event `json:"-"` } @@ -144,12 +151,8 @@ func (mgr *systemdManager) write() error { return ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } -func NewSystemd(name string, units map[string]string, eventCh chan Event) (Service, error) { - return NewSystemdRootless(name, units, true, eventCh) -} - -func newDbusConnection(rootless bool) (*dbus.Conn, error) { - if rootless { +func newDbusConnection(busType BusType) (*dbus.Conn, error) { + if busType == UserBus { return dbus.NewConnection(func() (*godbus.Conn, error) { uid := path.Base(os.Getenv("FLOTTA_XDG_RUNTIME_DIR")) path := filepath.Join(os.Getenv("FLOTTA_XDG_RUNTIME_DIR"), "systemd/private") @@ -176,11 +179,11 @@ func newDbusConnection(rootless bool) (*dbus.Conn, error) { } } -func NewSystemdRootless(name string, units map[string]string, rootless bool, eventCh chan Event) (Service, error) { +func NewSystemd(name string, units map[string]string, busType BusType, eventCh chan Event) (Service, error) { var err error var conn *dbus.Conn - conn, err = newDbusConnection(rootless) + conn, err = newDbusConnection(busType) if err != nil { return nil, err } @@ -195,7 +198,7 @@ func NewSystemdRootless(name string, units map[string]string, rootless bool, eve RestartSec: DefaultRestartTimeout, dbusConnection: conn, Units: unitNames, - Rootless: rootless, + BusType: busType, UnitsContent: units, eventCh: eventCh, }, nil @@ -233,7 +236,7 @@ func (s *systemd) GetName() string { } func (s *systemd) reload() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -242,7 +245,7 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -262,7 +265,7 @@ func (s *systemd) Start() error { } func (s *systemd) Stop() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -282,7 +285,7 @@ func (s *systemd) Stop() error { } func (s *systemd) Enable() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -293,7 +296,7 @@ func (s *systemd) Enable() error { } func (s *systemd) Disable() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index a5eb9bd9..c2991e96 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -341,7 +341,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units, p.eventCh) + svc, err = service.NewSystemd(podName, report.Units, service.UserBus, p.eventCh) if err != nil { return nil, err } @@ -358,7 +358,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units, p.eventCh) + svc, err = service.NewSystemd(podName, units, service.UserBus, p.eventCh) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 81c4ea30..376c3612 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -135,7 +135,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemdRootless("podman-auto-update", nil, false, nil) + svc, err := service.NewSystemd("podman-auto-update", nil, service.SystemBus, nil) if err != nil { return err } From bef0db6a3091431fbfa850bd5d63d252afe679a6 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Thu, 28 Jul 2022 20:41:58 -0400 Subject: [PATCH 03/13] Changes due to golint run --- internal/ansible/ansible_manager_test.go | 6 ++-- internal/ansible/mapping/mapping.go | 25 ++++++++++----- internal/ansible/mapping/mapping_test.go | 20 ++++++++---- internal/heartbeat/heartbeat_test.go | 40 ------------------------ 4 files changed, 36 insertions(+), 55 deletions(-) diff --git a/internal/ansible/ansible_manager_test.go b/internal/ansible/ansible_manager_test.go index 2f6077db..9a6fd60a 100644 --- a/internal/ansible/ansible_manager_test.go +++ b/internal/ansible/ansible_manager_test.go @@ -164,8 +164,10 @@ var _ = Describe("Ansible Runner", func() { modTime1 := time.Now().Add(-3 * time.Hour) modTime2 := time.Now().Add(-2 * time.Hour) - p1Sha := ansibleManager.MappingRepository.GetSha256(p1) - p2Sha := ansibleManager.MappingRepository.GetSha256(p2) + p1Sha, err := ansibleManager.MappingRepository.GetSha256(p1) + Expect(err).ToNot(HaveOccurred()) + p2Sha, err := ansibleManager.MappingRepository.GetSha256(p2) + Expect(err).ToNot(HaveOccurred()) p1Path := path.Join(configDir, p1Sha) p2Path := path.Join(configDir, p2Sha) diff --git a/internal/ansible/mapping/mapping.go b/internal/ansible/mapping/mapping.go index 06901b59..d240527a 100644 --- a/internal/ansible/mapping/mapping.go +++ b/internal/ansible/mapping/mapping.go @@ -21,7 +21,7 @@ type mapping struct { //go:generate mockgen -package=mapping -destination=mock_mapping.go . MappingRepository type MappingRepository interface { - GetSha256(fileContent []byte) string + GetSha256(fileContent []byte) (string, error) Add(fileContent []byte, modTime time.Time) error Remove(fileContent []byte) error RemoveMappingFile() error @@ -86,17 +86,24 @@ func (m *mappingRepository) GetAll() map[int]string { return all } -func (m *mappingRepository) GetSha256(fileContent []byte) string { +func (m *mappingRepository) GetSha256(fileContent []byte) (string, error) { h := sha256.New() - h.Write(fileContent) - return fmt.Sprintf("%x", h.Sum(nil)) + _, err := h.Write(fileContent) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", h.Sum(nil)), nil } func (m *mappingRepository) Add(fileContent []byte, modTime time.Time) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) - err := os.WriteFile(filePath, []byte(fileContent), 0600) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) + err = os.WriteFile(filePath, []byte(fileContent), 0600) if err != nil { return err @@ -112,7 +119,11 @@ func (m *mappingRepository) Remove(fileContent []byte) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) modTime := m.pathToModTime[filePath] delete(m.modTimeToPath, modTime) delete(m.pathToModTime, filePath) diff --git a/internal/ansible/mapping/mapping_test.go b/internal/ansible/mapping/mapping_test.go index ce1129a0..a95d7368 100644 --- a/internal/ansible/mapping/mapping_test.go +++ b/internal/ansible/mapping/mapping_test.go @@ -24,7 +24,8 @@ var _ = Describe("Mapping", func() { repo, err = mapping.NewMappingRepository(dir) Expect(err).ToNot(HaveOccurred()) - sha256Test = repo.GetSha256([]byte("test")) + sha256Test, err = repo.GetSha256([]byte("test")) + Expect(err).ToNot(HaveOccurred()) filePathTest = path.Join(configDir, sha256Test) }) AfterEach(func() { @@ -32,8 +33,10 @@ var _ = Describe("Mapping", func() { Expect(err).ToNot(HaveOccurred()) }) It("sha256 Generation", func() { - s1 := repo.GetSha256([]byte("AAA")) - s2 := repo.GetSha256([]byte("AAA")) + s1, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) + s2, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) Expect(s1).To(Equal(s2)) }) It("Should be created empty", func() { @@ -87,12 +90,17 @@ var _ = Describe("Mapping", func() { It("Should persist mappings", func() { // given - filePath1 := path.Join(configDir, repo.GetSha256([]byte("test-one"))) - filePath2 := path.Join(configDir, repo.GetSha256([]byte("test-two"))) + sha, err := repo.GetSha256([]byte("test-one")) + Expect(err).ToNot(HaveOccurred()) + filePath1 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) + sha, err = repo.GetSha256([]byte("test-two")) + filePath2 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) modTime1 := time.Now() modTime2 := modTime1.Add(1 * time.Minute) - err := repo.Add([]byte("test-one"), modTime1) + err = repo.Add([]byte("test-one"), modTime1) Expect(err).ToNot(HaveOccurred()) Expect(repo.GetModTime(filePath1)).To(Equal(modTime1.UnixNano())) Expect(repo.GetFilePath(modTime1)).To(Equal(filePath1)) diff --git a/internal/heartbeat/heartbeat_test.go b/internal/heartbeat/heartbeat_test.go index 4fd482f7..d07da33d 100644 --- a/internal/heartbeat/heartbeat_test.go +++ b/internal/heartbeat/heartbeat_test.go @@ -6,14 +6,12 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "reflect" "sync" "time" "git.sr.ht/~spc/go-log" - "github.com/openshift/assisted-installer-agent/src/util" "github.com/project-flotta/flotta-device-worker/internal/ansible" os2 "github.com/project-flotta/flotta-device-worker/internal/os" "github.com/project-flotta/flotta-device-worker/internal/registration" @@ -759,44 +757,6 @@ func (d *DispatcherFailing) GetConfig(ctx context.Context, in *pb.Empty, opts .. return nil, nil } -func NewFilledInterfaceMock(mtu int, name string, macAddr string, flags net.Flags, addrs []string, isPhysical bool, isBonding bool, isVlan bool, speedMbps int64) *util.MockInterface { - hwAddr, _ := net.ParseMAC(macAddr) - ret := util.MockInterface{} - ret.On("IsPhysical").Return(isPhysical) - if isPhysical || isBonding || isVlan { - ret.On("Name").Return(name) - ret.On("MTU").Return(mtu) - ret.On("HardwareAddr").Return(hwAddr) - ret.On("Flags").Return(flags) - ret.On("Addrs").Return(toAddresses(addrs), nil) - ret.On("SpeedMbps").Return(speedMbps) - } - if !isPhysical { - ret.On("IsBonding").Return(isBonding) - } - if !(isPhysical || isBonding) { - ret.On("IsVlan").Return(isVlan) - } - - return &ret -} - -func toAddresses(addrs []string) []net.Addr { - ret := make([]net.Addr, 0) - for _, a := range addrs { - ret = append(ret, str2Addr(a)) - } - return ret -} - -func str2Addr(addrStr string) net.Addr { - ip, ipnet, err := net.ParseCIDR(addrStr) - if err != nil { - return &net.IPNet{} - } - return &net.IPNet{IP: ip, Mask: ipnet.Mask} -} - func initHwMock(hwMock *hardware.MockHardware, configManager *configuration.Manager, hostname string, ipv4 []string) (*gomock.Call, *gomock.Call, *gomock.Call) { var m models.HardwareInfo configManager.GetDeviceConfiguration().Heartbeat.HardwareProfile.Scope = heartbeat.ScopeDelta From 51609407c677142ab71054d8123b5bbc3bd495cc Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Mon, 1 Aug 2022 10:19:32 -0400 Subject: [PATCH 04/13] Refactored event listener based on feedback --- internal/service/event_listener.go | 52 +++++++++++++++--------------- internal/service/systemd.go | 8 +---- internal/workload/wrapper.go | 10 ++++-- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 5b0de69f..0868b2ff 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -50,44 +50,44 @@ type EventListener struct { dbusErrCh <-chan error } -func NewEventListener(observerCh chan *Event) *EventListener { - return &EventListener{observerCh: observerCh} +func NewEventListener() *EventListener { + return &EventListener{} } -func (e *EventListener) Connect() error { +func (e *EventListener) Connect() (chan *Event, error) { conn, err := newDbusConnection(UserBus) if err != nil { - return err + return nil, err } e.set = conn.NewSubscriptionSet() - return nil + e.observerCh = make(chan *Event) + return e.observerCh, nil } -func (e *EventListener) Listen() { +func (e *EventListener) Listen() chan *Event { e.dbusCh, e.dbusErrCh = e.set.Subscribe() - go func() { - for { - select { - case msg := <-e.dbusCh: - for name, unit := range msg { - log.Debugf("Captured event for %s: %v+", name, unit) - n := extractWorkloadName(name) - state := translateUnitSubStatus(unit) - log.Infof("Service %s for workload transitioned to sub state %s\n", n, state) - switch state { - case running: - e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} - case removed, stop, dead, failed, start: - e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} - default: - log.Infof("Ignoring unit sub state for service %s: %s", name, unit.SubState) - } + for { + select { + case msg := <-e.dbusCh: + for name, unit := range msg { + log.Debugf("Captured event for %s: %v+", name, unit) + n := extractWorkloadName(name) + state := translateUnitSubStatus(unit) + log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) + switch state { + case running: + e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} + case removed, stop, dead, failed, start: + e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} + default: + log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) } - case msg := <-e.dbusErrCh: - log.Errorf("Error while parsing dbus event: %v", msg) } + case msg := <-e.dbusErrCh: + log.Errorf("Error while parsing dbus event: %v", msg) } - }() + } + } func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 97e5250c..5f8e3b0c 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -70,7 +70,7 @@ type systemdManager struct { eventListener *EventListener } -func NewSystemdManager(configDir string, observerCh chan *Event) (SystemdManager, error) { +func NewSystemdManager(configDir string, listener *EventListener) (SystemdManager, error) { services := make(map[string]*systemd) servicePath := path.Join(configDir, "services.json") servicesJson, err := ioutil.ReadFile(servicePath) //#nosec @@ -85,15 +85,9 @@ func NewSystemdManager(configDir string, observerCh chan *Event) (SystemdManager for k, v := range services { systemdSVC[k] = v } - listener := NewEventListener(observerCh) - err = listener.Connect() - if err != nil { - return nil, err - } for name := range services { listener.Add(name) } - listener.Listen() return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}, eventListener: listener}, nil } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 376c3612..54eb6782 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -82,8 +82,14 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) } - eventCh := make(chan *service.Event) - serviceManager, err := service.NewSystemdManager(configDir, eventCh) + listener := service.NewEventListener() + eventCh, err := listener.Connect() + if err != nil { + return nil, err + } + go listener.Listen() + + serviceManager, err := service.NewSystemdManager(configDir, listener) if err != nil { return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) } From 72763f825d44d85600e50b3c48d5a34ddb0208f2 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 2 Aug 2022 10:52:19 -0400 Subject: [PATCH 05/13] Minimal refactor and cosmetic changes based on feedback --- internal/service/event_listener.go | 8 ++++++-- internal/service/systemd.go | 1 - internal/workload/wrapper.go | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index 0868b2ff..a9b37c8c 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -98,7 +98,7 @@ func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { } func (e *EventListener) Add(workloadName string) { - name := fmt.Sprintf("%s.service", workloadName) + name := formatServiceName(workloadName) log.Debugf("Adding service for events %s", name) if !e.set.Contains(name) { e.set.Add(name) @@ -106,7 +106,7 @@ func (e *EventListener) Add(workloadName string) { } func (e *EventListener) Remove(workloadName string) { - name := fmt.Sprintf("%s.service", workloadName) + name := formatServiceName(workloadName) log.Debugf("Removing service for events %s", name) e.set.Remove(name) } @@ -114,3 +114,7 @@ func (e *EventListener) Remove(workloadName string) { func extractWorkloadName(serviceName string) string { return serviceName[:len(serviceName)-servicePostfixLength] } + +func formatServiceName(workloadName string) string { + return fmt.Sprintf("%s.service", workloadName) +} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 5f8e3b0c..73a157e6 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -20,7 +20,6 @@ const ( DefaultRestartTimeout = 15 TimerSuffix = ".timer" ServiceSuffix = ".service" - DefaultNameSeparator = "-" ) var ( diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 54eb6782..37609748 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -282,9 +282,9 @@ func (ww *Workload) removeService(workloadName string) error { return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) } - err = ww.serviceManager.Remove(svc) + err := ww.serviceManager.Remove(svc) if err != nil { - log.Errorf("unable to remove service from serviceManager %s:%s", workloadName, err) + log.Errorf("Unable to remove service from serviceManager %s:%s", workloadName, err) } return nil } From 8fd2b8c77f743328fc83a2797c53c1cb1fa490cd Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 2 Aug 2022 11:43:58 -0400 Subject: [PATCH 06/13] Changed podman-auto-update to use userBus --- internal/workload/wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 37609748..038c74f4 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -141,7 +141,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemd("podman-auto-update", nil, service.SystemBus, nil) + svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus, nil) if err != nil { return err } From 623a8576700a17a4cf22b618f7d296a1e75846d6 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 2 Aug 2022 11:46:28 -0400 Subject: [PATCH 07/13] Refactor systemd listener: * Moved event listener instance and initialization to cmd.go main * Added a file system watcher to the systemd enabled services directory to be notified of new services from being enabled/disabled so that the event listener can track their status * Encapsulated the code to listen to service events generated by the systemd event listener so that it is called at the last line of the cmd.go#main function to make sure all observers are ready before start processing the service events --- cmd/device-worker/main.go | 48 +++++--- go.mod | 1 + internal/ansible/mapping/mock_mapping.go | 5 +- internal/configuration/configuration.go | 1 + internal/metrics/system.go | 2 +- internal/metrics/workload.go | 1 + internal/service/event_listener.go | 150 +++++++++++++++++++---- internal/service/systemd.go | 27 ++-- internal/workload/manager.go | 8 +- internal/workload/mock_wrapper.go | 12 ++ internal/workload/podman/podman.go | 8 +- internal/workload/wrapper.go | 121 +++++++++--------- internal/workload/wrapper_test.go | 2 +- vendor/modules.txt | 1 + 14 files changed, 254 insertions(+), 133 deletions(-) diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index 94341533..086137c0 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -20,7 +20,8 @@ import ( os2 "github.com/project-flotta/flotta-device-worker/internal/os" registration2 "github.com/project-flotta/flotta-device-worker/internal/registration" "github.com/project-flotta/flotta-device-worker/internal/server" - workload2 "github.com/project-flotta/flotta-device-worker/internal/workload" + "github.com/project-flotta/flotta-device-worker/internal/service" + workload "github.com/project-flotta/flotta-device-worker/internal/workload" "net" "os" @@ -41,9 +42,9 @@ const ( ) func initSystemdDirectory() error { - systemddir := filepath.Join(os.Getenv("HOME"), ".config/systemd/user/") + // init the flotta user systemd units directory - err := os.MkdirAll(systemddir, 0750) + err := os.MkdirAll(service.SystemdUserServicesFullPath, 0750) if err != nil { return err } @@ -141,6 +142,16 @@ func main() { } configManager := configuration2.NewConfigurationManager(dataDir) + // Systemd event listener + listener := service.NewDBusEventListener() + eventCh, err := listener.Connect() + if err != nil { + log.Fatal(err) + } + // Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise + // we risk in missing observers whilst processing the events. + go listener.Listen() + // --- Client metrics configuration --- metricsStore, err := metrics.NewTSDB(dataDir) if err != nil { @@ -162,16 +173,16 @@ func main() { dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon) configManager.RegisterObserver(dataTransferWatcher) - wl, err := workload2.NewWorkloadManager(dataDir, deviceId) + workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, eventCh) if err != nil { log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err) } - logsWrapper := logs.NewWorkloadsLogsTarget(wl) + logsWrapper := logs.NewWorkloadsLogsTarget(workloadManager) configManager.RegisterObserver(logsWrapper) - wl.RegisterObserver(logsWrapper) - configManager.RegisterObserver(wl) - wl.RegisterObserver(workloadMetricWatcher) + workloadManager.RegisterObserver(logsWrapper) + configManager.RegisterObserver(workloadManager) + workloadManager.RegisterObserver(workloadMetricWatcher) remoteWrite := metrics.NewRemoteWrite(dataDir, deviceId, metricsStore) configManager.RegisterObserver(remoteWrite) @@ -189,25 +200,21 @@ func main() { } configManager.RegisterObserver(mountManager) - dataMonitor := datatransfer.NewMonitor(wl, configManager) - wl.RegisterObserver(dataMonitor) + dataMonitor := datatransfer.NewMonitor(workloadManager, configManager) + workloadManager.RegisterObserver(dataMonitor) configManager.RegisterObserver(dataMonitor) dataMonitor.Start() - if err != nil { - log.Fatalf("cannot start metrics store. DeviceID: %s; err: %v", deviceId, err) - } - - reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, wl) + reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, workloadManager) if err != nil { log.Fatalf("cannot start registration process: DeviceID: %s; err: %v", deviceId, err) } - hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, wl, &hw, dataMonitor, deviceOs, reg) + hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, workloadManager, &hw, dataMonitor, deviceOs, reg) configManager.RegisterObserver(hbs) reg.DeregisterLater( - wl, + workloadManager, configManager, hbs, dataMonitor, @@ -239,16 +246,19 @@ func main() { setupSignalHandler(metricsStore, ansibleManager) - go listenStartGracefulRebootChannel(wl, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, + go listenStartGracefulRebootChannel(workloadManager, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, gracefulRebootChannel, deviceOs) + // All observers have been registered at this point. Ready to start listening to workload service events generated by changes to the systemd dbus + go workloadManager.ListenServiceEvents() + if err := s.Serve(l); err != nil { log.Fatalf("cannot start worker server, err: %v", err) } } -func listenStartGracefulRebootChannel(wl *workload2.WorkloadManager, dataMonitor *datatransfer.Monitor, +func listenStartGracefulRebootChannel(wl *workload.WorkloadManager, dataMonitor *datatransfer.Monitor, systemMetricsWatcher *metrics.SystemMetrics, metricsStore *metrics.TSDB, hbs *heartbeat2.Heartbeat, ansibleManager *ansible.Manager, gracefulRebootChannel chan struct{}, deviceOs *os2.OS) { // listen to the channel for getting StartGracefulReboot signal diff --git a/go.mod b/go.mod index 9d4b16ef..a77da14a 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/digitalocean/godo v1.80.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect + github.com/fsnotify/fsnotify v1.5.4 github.com/go-kit/log v0.2.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/runtime v0.23.1 // indirect diff --git a/internal/ansible/mapping/mock_mapping.go b/internal/ansible/mapping/mock_mapping.go index fd3ead81..6a05f38b 100644 --- a/internal/ansible/mapping/mock_mapping.go +++ b/internal/ansible/mapping/mock_mapping.go @@ -105,11 +105,12 @@ func (mr *MockMappingRepositoryMockRecorder) GetModTime(arg0 interface{}) *gomoc } // GetSha256 mocks base method. -func (m *MockMappingRepository) GetSha256(arg0 []byte) string { +func (m *MockMappingRepository) GetSha256(arg0 []byte) (string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSha256", arg0) ret0, _ := ret[0].(string) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // GetSha256 indicates an expected call of GetSha256. diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index 2149113a..d3b40603 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -147,6 +147,7 @@ func (m *Manager) Update(message models.DeviceConfigurationMessage) error { } log.Infof("updating configuration. New config: %s\nOld config: %s", newJson, oldJson) + log.Debugf("[ConfigManager] observers :%v+", m.observers) for _, observer := range m.observers { err := observer.Update(message) if err != nil { diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 36ef75cf..6a308c53 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus, nil) + nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus) if err != nil { return nil, err } diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index 3d7ca822..a65d9960 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -53,6 +53,7 @@ func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) { } func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podman.PodReport) { + log.Infof("Starting target metrics for workload '%s'", workloadName) for _, workload := range report { cfg := wrkM.getWorkload(workloadName) if cfg == nil { diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index a9b37c8c..e933bbda 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -1,20 +1,24 @@ package service import ( + "errors" "fmt" + "os" + "path/filepath" + "strings" "git.sr.ht/~spc/go-log" "github.com/coreos/go-systemd/v22/dbus" + "github.com/fsnotify/fsnotify" ) const ( - // servicePostfixLength is the length of the string ".service". This value is used to extract the workload name from the service - // coming from systemd - servicePostfixLength = 8 - EventStarted EventType = "started" EventStopped EventType = "stopped" + systemdUserServicesDir = ".config/systemd/user" + defaultTargetWantsRelativePath = "default.target.wants/" + // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent // The only addition to the list is the "removed" case, which we use when a service is removed // from the system and we receive an empty unitStatus object. @@ -34,6 +38,11 @@ const ( removed unitSubState = "removed" ) +var ( + SystemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) + enabledServicesFullPath = filepath.Join(SystemdUserServicesFullPath, defaultTargetWantsRelativePath) +) + type unitSubState string type EventType string @@ -43,41 +52,92 @@ type Event struct { Type EventType } -type EventListener struct { +type DBusEventListener struct { observerCh chan *Event set *dbus.SubscriptionSet dbusCh <-chan map[string]*dbus.UnitStatus dbusErrCh <-chan error + fsWatcher *fsnotify.Watcher } -func NewEventListener() *EventListener { - return &EventListener{} +func NewDBusEventListener() *DBusEventListener { + return &DBusEventListener{} } -func (e *EventListener) Connect() (chan *Event, error) { +func (e *DBusEventListener) Connect() (<-chan *Event, error) { conn, err := newDbusConnection(UserBus) if err != nil { return nil, err } e.set = conn.NewSubscriptionSet() - e.observerCh = make(chan *Event) + if err := e.initializeSubscriptionSet(); err != nil { + return nil, err + } + e.observerCh = make(chan *Event, 1000) return e.observerCh, nil } -func (e *EventListener) Listen() chan *Event { +func (e *DBusEventListener) initializeSubscriptionSet() error { + _, err := os.Stat(enabledServicesFullPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return e.watchServiceDirectory() + } + return err + } + files, err := os.ReadDir(enabledServicesFullPath) + if err != nil { + return err + } + log.Infof("List of services to be monitored in %s:%s", enabledServicesFullPath, files) + for _, fd := range files { + f := filepath.Join(enabledServicesFullPath, fd.Name()) + log.Debugf("Detected service file %s", f) + if !fileExist(f) { + log.Errorf("Hard link %s does not exist or broken link", fd.Name()) + continue + } + e.Add(getServiceFileName(fd.Name())) + } + return e.watchServiceDirectory() +} + +func (e *DBusEventListener) watchServiceDirectory() error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + e.fsWatcher = watcher + + _, err = os.Stat(SystemdUserServicesFullPath) + if err != nil { + return fmt.Errorf("systemd user directory not found: %s", err) + } + + go e.watchFSEvents() + return e.fsWatcher.Add(SystemdUserServicesFullPath) +} + +func (e *DBusEventListener) Listen() { e.dbusCh, e.dbusErrCh = e.set.Subscribe() for { select { case msg := <-e.dbusCh: for name, unit := range msg { - log.Debugf("Captured event for %s: %v+", name, unit) - n := extractWorkloadName(name) + log.Debugf("Captured DBus event for %s: %v+", name, unit) + n, err := extractWorkloadName(name) + if err != nil { + log.Error(err) + continue + } state := translateUnitSubStatus(unit) log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) switch state { case running: + log.Debugf("Sending start event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} case removed, stop, dead, failed, start: + log.Debugf("Sending stop event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} default: log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) @@ -90,6 +150,36 @@ func (e *EventListener) Listen() chan *Event { } +func (e *DBusEventListener) watchFSEvents() { + for { + select { + case event, ok := <-e.fsWatcher.Events: + if !ok { + log.Errorf("Error while watching for file system events at %s:%s", SystemdUserServicesFullPath, event) + continue + } + log.Debugf("Captured file system event:%s", event) + if !isAnEnabledService(event.Name) { + continue + } + svcName := getServiceFileName(event.Name) + switch { + case event.Op&fsnotify.Create == fsnotify.Create: + log.Infof("New service configuration detected at %s", event.Name) + e.Add(svcName) + case event.Op&fsnotify.Remove == fsnotify.Remove: + log.Infof("Service configuration removed at %s", event.Name) + e.Remove(svcName) + } + case err, ok := <-e.fsWatcher.Errors: + if !ok { + log.Errorf("Error detected on file system watcher: %s", err) + return + } + } + } +} + func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { if unit == nil { return removed @@ -97,24 +187,32 @@ func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { return unitSubState(unit.SubState) } -func (e *EventListener) Add(workloadName string) { - name := formatServiceName(workloadName) - log.Debugf("Adding service for events %s", name) - if !e.set.Contains(name) { - e.set.Add(name) - } +func (e *DBusEventListener) Add(serviceName string) { + log.Debugf("Adding service for event listener %s", serviceName) + e.set.Add(serviceName) } -func (e *EventListener) Remove(workloadName string) { - name := formatServiceName(workloadName) - log.Debugf("Removing service for events %s", name) - e.set.Remove(name) +func (e *DBusEventListener) Remove(serviceName string) { + log.Debugf("Removing service from event listener %s", serviceName) + e.set.Remove(serviceName) } -func extractWorkloadName(serviceName string) string { - return serviceName[:len(serviceName)-servicePostfixLength] +func extractWorkloadName(serviceName string) (string, error) { + if filepath.Ext(serviceName) != ServiceSuffix { + return "", fmt.Errorf("invalid file name or not a service %s", serviceName) + } + return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil +} + +func fileExist(path string) bool { + _, err := os.Stat(path) + return err == nil } -func formatServiceName(workloadName string) string { - return fmt.Sprintf("%s.service", workloadName) +func isAnEnabledService(fullPath string) bool { + return filepath.Ext(fullPath) == ServiceSuffix && filepath.Dir(fullPath) == enabledServicesFullPath +} +func getServiceFileName(fullPath string) string { + _, filename := filepath.Split(fullPath) + return filename } diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 73a157e6..844bf80e 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -46,12 +46,10 @@ type Service interface { type systemd struct { Name string `json:"name"` - RestartSec int `json:"restartSec"` Units []string `json:"units"` UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` BusType BusType `json:"busType"` - eventCh chan Event `json:"-"` } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -63,13 +61,12 @@ type SystemdManager interface { } type systemdManager struct { - svcFilePath string - lock sync.RWMutex - services map[string]Service - eventListener *EventListener + svcFilePath string + lock sync.RWMutex + services map[string]Service } -func NewSystemdManager(configDir string, listener *EventListener) (SystemdManager, error) { +func NewSystemdManager(configDir string) (SystemdManager, error) { services := make(map[string]*systemd) servicePath := path.Join(configDir, "services.json") servicesJson, err := ioutil.ReadFile(servicePath) //#nosec @@ -84,10 +81,8 @@ func NewSystemdManager(configDir string, listener *EventListener) (SystemdManage for k, v := range services { systemdSVC[k] = v } - for name := range services { - listener.Add(name) - } - return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}, eventListener: listener}, nil + + return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}}, nil } func (mgr *systemdManager) RemoveServicesFile() error { @@ -113,7 +108,6 @@ func (mgr *systemdManager) Add(svc Service) error { if err != nil { return err } - mgr.eventListener.Add(svc.GetName()) return nil } @@ -132,7 +126,6 @@ func (mgr *systemdManager) Remove(svc Service) error { return err } delete(mgr.services, svc.GetName()) - mgr.eventListener.Remove(svc.GetName()) return nil } @@ -172,7 +165,7 @@ func newDbusConnection(busType BusType) (*dbus.Conn, error) { } } -func NewSystemd(name string, units map[string]string, busType BusType, eventCh chan Event) (Service, error) { +func NewSystemd(name string, units map[string]string, busType BusType) (Service, error) { var err error var conn *dbus.Conn @@ -188,12 +181,10 @@ func NewSystemd(name string, units map[string]string, busType BusType, eventCh c return &systemd{ Name: name, - RestartSec: DefaultRestartTimeout, dbusConnection: conn, Units: unitNames, BusType: busType, UnitsContent: units, - eventCh: eventCh, }, nil } @@ -220,7 +211,6 @@ func (s *systemd) Remove() error { return err } } - s.eventCh <- Event{WorkloadName: s.Name, Type: EventStopped} return s.reload() } @@ -238,6 +228,7 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { + log.Debugf("Starting service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err @@ -278,6 +269,7 @@ func (s *systemd) Stop() error { } func (s *systemd) Enable() error { + log.Debugf("Enabling service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err @@ -289,6 +281,7 @@ func (s *systemd) Enable() error { } func (s *systemd) Disable() error { + log.Debugf("Disabling service %s", s.Name) conn, err := newDbusConnection(s.BusType) if err != nil { return err diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 489876fd..12f6a47c 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -40,8 +40,8 @@ type WorkloadManager struct { deviceId string } -func NewWorkloadManager(dataDir string, deviceId string) (*WorkloadManager, error) { - wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval) +func NewWorkloadManager(dataDir string, deviceId string, systemdEventCh <-chan *service.Event) (*WorkloadManager, error) { + wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval, systemdEventCh) if err != nil { return nil, err } @@ -415,6 +415,10 @@ func (w *WorkloadManager) deleteVolumeDir() error { return deleteDir(w.volumesDir) } +func (w *WorkloadManager) ListenServiceEvents() { + w.workloads.ListenServiceEvents() +} + func deleteDir(path string) error { err := os.RemoveAll(path) if err != nil { diff --git a/internal/workload/mock_wrapper.go b/internal/workload/mock_wrapper.go index 49ed1f81..b3a163ed 100644 --- a/internal/workload/mock_wrapper.go +++ b/internal/workload/mock_wrapper.go @@ -95,6 +95,18 @@ func (mr *MockWorkloadWrapperMockRecorder) ListSecrets() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSecrets", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListSecrets)) } +// ListenServiceEvents mocks base method. +func (m *MockWorkloadWrapper) ListenServiceEvents() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ListenServiceEvents") +} + +// ListenServiceEvents indicates an expected call of ListenServiceEvents. +func (mr *MockWorkloadWrapperMockRecorder) ListenServiceEvents() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListenServiceEvents", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListenServiceEvents)) +} + // Logs mocks base method. func (m *MockWorkloadWrapper) Logs(arg0 string, arg1 io.Writer) (context.CancelFunc, error) { m.ctrl.T.Helper() diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index c2991e96..58a7d2ef 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -109,8 +109,6 @@ const DefaultTimeoutForStoppingInSeconds int = 5 type podman struct { podmanConnection context.Context timeoutForStopping int - eventCh chan service.Event - cancel chan bool } func NewPodman() (*podman, error) { @@ -121,8 +119,6 @@ func NewPodman() (*podman, error) { p := &podman{ podmanConnection: podmanConnection, timeoutForStopping: DefaultTimeoutForStoppingInSeconds, - eventCh: make(chan service.Event, 1000), - cancel: make(chan bool), } err = p.MinVersion() if err != nil { @@ -341,7 +337,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units, service.UserBus, p.eventCh) + svc, err = service.NewSystemd(podName, report.Units, service.UserBus) if err != nil { return nil, err } @@ -358,7 +354,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units, service.UserBus, p.eventCh) + svc, err = service.NewSystemd(podName, units, service.UserBus) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 038c74f4..7102e5bc 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -44,31 +44,33 @@ type WorkloadWrapper interface { RemoveSecret(string) error CreateSecret(string, string) error UpdateSecret(string, string) error + ListenServiceEvents() } // Workload manages the workload and its configuration on the device type Workload struct { - workloads podman.Podman + podManager podman.Podman netfilter network.Netfilter mappingRepository mapping.MappingRepository observers []Observer serviceManager service.SystemdManager monitoringInterval uint - events chan *podman.PodmanEvent lock sync.RWMutex + systemdEventCh <-chan *service.Event } -func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint) *Workload { +func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint, systemdEventCh <-chan *service.Event) *Workload { return &Workload{ - workloads: p, + podManager: p, netfilter: n, mappingRepository: m, serviceManager: s, monitoringInterval: monitoringInterval, + systemdEventCh: systemdEventCh, } } -func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, error) { +func newWorkloadInstance(configDir string, monitoringInterval uint, systemdEventCh <-chan *service.Event) (*Workload, error) { newPodman, err := podman.NewPodman() if err != nil { return nil, fmt.Errorf("workload cannot initialize podman manager: %w", err) @@ -82,54 +84,20 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) } - listener := service.NewEventListener() - eventCh, err := listener.Connect() - if err != nil { - return nil, err - } - go listener.Listen() - - serviceManager, err := service.NewSystemdManager(configDir, listener) + serviceManager, err := service.NewSystemdManager(configDir) if err != nil { return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) } ww := &Workload{ - workloads: newPodman, + podManager: newPodman, netfilter: netfilter, mappingRepository: mappingRepository, serviceManager: serviceManager, monitoringInterval: monitoringInterval, - events: make(chan *podman.PodmanEvent), - } - - go func() { - for { - event := <-eventCh - log.Debugf("Event received %s", string(event.Type)) - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() - switch event.Type { - case service.EventStarted: - log.Infof("Service for workload %s started", event.WorkloadName) - report, err := newPodman.GetPodReportForPodName(event.WorkloadName) - if err != nil { - log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) - } - for _, observer := range observers { - observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) - } - case service.EventStopped: - log.Infof("Service for workload %s stopped", event.WorkloadName) - for _, observer := range observers { - observer.WorkloadRemoved(event.WorkloadName) - } - default: - log.Errorf("unknown event %s", event.Type) - } - } - }() + systemdEventCh: systemdEventCh, + } + return ww, nil } @@ -141,7 +109,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus, nil) + svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus) if err != nil { return err } @@ -154,7 +122,7 @@ func (ww *Workload) Init() error { } func (ww *Workload) List() ([]api.WorkloadInfo, error) { - infos, err := ww.workloads.List() + infos, err := ww.podManager.List() if err != nil { return nil, err } @@ -168,7 +136,7 @@ func (ww *Workload) List() ([]api.WorkloadInfo, error) { } func (ww *Workload) Logs(podID string, res io.Writer) (context.CancelFunc, error) { - return ww.workloads.Logs(podID, res) + return ww.podManager.Logs(podID, res) } func (ww *Workload) Remove(workloadName string) error { @@ -182,7 +150,7 @@ func (ww *Workload) Remove(workloadName string) error { return err } - if err := ww.workloads.Remove(id); err != nil { + if err := ww.podManager.Remove(id); err != nil { return err } if err := ww.netfilter.DeleteChain(nfTableName, workloadName); err != nil { @@ -199,7 +167,7 @@ func (ww *Workload) Stop(workloadName string) error { if id == "" { id = workloadName } - err := ww.workloads.Stop(id) + err := ww.podManager.Stop(id) return err } @@ -234,7 +202,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri if err := ww.applyNetworkConfiguration(workload); err != nil { return err } - podIds, err := ww.workloads.Run(manifestPath, authFilePath, podmanAnnotations) + podIds, err := ww.podManager.Run(manifestPath, authFilePath, podmanAnnotations) if err != nil { return err } @@ -245,7 +213,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri } // Create the system service to manage the pod: - svc, err := ww.workloads.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) + svc, err := ww.podManager.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) if err != nil { return fmt.Errorf("error while generating systemd service: %v", err) } @@ -255,7 +223,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri if err != nil { return fmt.Errorf("error while starting service: %v", err) } - log.Infof("Adding service for listener for %s", workload.Name) + log.Infof("Registering service %s", workload.Name) err = ww.serviceManager.Add(svc) if err != nil { return fmt.Errorf("error while updating service manager: %v", err) @@ -270,7 +238,10 @@ func (ww *Workload) removeService(workloadName string) error { } // Ignore stop failure: - _ = svc.Stop() + err := svc.Stop() + if err != nil { + return fmt.Errorf("unable to stop service %s:%s", workloadName, err) + } // Disable the service from the system: if err := svc.Disable(); err != nil { @@ -282,7 +253,7 @@ func (ww *Workload) removeService(workloadName string) error { return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) } - err := ww.serviceManager.Remove(svc) + err = ww.serviceManager.Remove(svc) if err != nil { log.Errorf("Unable to remove service from serviceManager %s:%s", workloadName, err) } @@ -338,7 +309,7 @@ func (ww *Workload) Start(workload *v1.Pod) error { } podId := ww.mappingRepository.GetId(workload.Name) - if err := ww.workloads.Start(podId); err != nil { + if err := ww.podManager.Start(podId); err != nil { return err } return nil @@ -349,19 +320,19 @@ func (ww *Workload) PersistConfiguration() error { } func (ww *Workload) ListSecrets() (map[string]struct{}, error) { - return ww.workloads.ListSecrets() + return ww.podManager.ListSecrets() } func (ww *Workload) RemoveSecret(name string) error { - return ww.workloads.RemoveSecret(name) + return ww.podManager.RemoveSecret(name) } func (ww *Workload) CreateSecret(name, data string) error { - return ww.workloads.CreateSecret(name, data) + return ww.podManager.CreateSecret(name, data) } func (ww *Workload) UpdateSecret(name, data string) error { - return ww.workloads.UpdateSecret(name, data) + return ww.podManager.UpdateSecret(name, data) } func getHostPorts(workload *v1.Pod) ([]int32, error) { @@ -377,3 +348,35 @@ func getHostPorts(workload *v1.Pod) ([]int32, error) { } return hostPorts, nil } + +func (w *Workload) ListenServiceEvents() { + log.Debug("Starting routine to listen for service events") + for { + event := <-w.systemdEventCh + log.Debugf("Event received: %s", string(event.Type)) + w.lock.Lock() + observers := w.observers + w.lock.Unlock() + log.Debugf("Number of observers %d: %+v", len(observers), observers) + switch event.Type { + case service.EventStarted: + log.Infof("Service for workload %s started", event.WorkloadName) + report, err := w.podManager.GetPodReportForPodName(event.WorkloadName) + if err != nil { + log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) + } + for _, observer := range observers { + log.Debugf("Triggering WorkloadStarted in observer '%s' for workload %s", observer, event.WorkloadName) + observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) + } + case service.EventStopped: + log.Infof("Service for workload %s stopped", event.WorkloadName) + for _, observer := range observers { + log.Debugf("Triggering WorkloadRemoved in observer '%s' for workload %s", observer, event.WorkloadName) + observer.WorkloadRemoved(event.WorkloadName) + } + default: + log.Errorf("Unknown event %s", event.Type) + } + } +} diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index 2db237f0..1cfa60e8 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -40,7 +40,7 @@ var _ = Describe("Workload management", func() { serviceManager = service.NewMockSystemdManager(mockCtrl) svc = service.NewMockService(mockCtrl) - wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15) + wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15, nil) }) AfterEach(func() { diff --git a/vendor/modules.txt b/vendor/modules.txt index 3a743d42..f59f968a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -393,6 +393,7 @@ github.com/docker/go-units # github.com/evanphx/json-patch v4.12.0+incompatible github.com/evanphx/json-patch # github.com/fsnotify/fsnotify v1.5.4 +## explicit github.com/fsnotify/fsnotify # github.com/gabriel-vasile/mimetype v1.4.0 github.com/gabriel-vasile/mimetype From 0ff95dc82349e3d9592571f69ab9b26273b22bb3 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Wed, 3 Aug 2022 16:36:28 -0400 Subject: [PATCH 08/13] Fix svc not being deleted from services.json --- internal/service/systemd.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 844bf80e..7a8d68a0 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -104,11 +104,7 @@ func (mgr *systemdManager) Add(svc Service) error { defer mgr.lock.Unlock() mgr.services[svc.GetName()] = svc - err := mgr.write() - if err != nil { - return err - } - return nil + return mgr.write() } func (mgr *systemdManager) Get(name string) Service { @@ -121,12 +117,9 @@ func (mgr *systemdManager) Get(name string) Service { func (mgr *systemdManager) Remove(svc Service) error { mgr.lock.Lock() defer mgr.lock.Unlock() - err := mgr.write() - if err != nil { - return err - } + delete(mgr.services, svc.GetName()) - return nil + return mgr.write() } func (mgr *systemdManager) write() error { From d4b9052f9c2d382555e8a53d5c0c42ad26f38986 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Wed, 3 Aug 2022 16:50:27 -0400 Subject: [PATCH 09/13] Added Stringer() interface to observers so that they list correctly in debug --- internal/configuration/configuration.go | 1 + internal/configuration/configuration_mock.go | 14 ++++++++++++++ internal/logs/workload.go | 4 ++++ internal/metrics/remote_write.go | 4 ++++ internal/metrics/workload.go | 4 ++++ internal/mount/mount.go | 4 ++++ internal/os/os.go | 4 ++++ 7 files changed, 35 insertions(+) diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index d3b40603..3964d4fd 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -35,6 +35,7 @@ var ( type Observer interface { Init(configuration models.DeviceConfigurationMessage) error Update(configuration models.DeviceConfigurationMessage) error + String() string } type Manager struct { diff --git a/internal/configuration/configuration_mock.go b/internal/configuration/configuration_mock.go index 6bb849e9..987e371e 100644 --- a/internal/configuration/configuration_mock.go +++ b/internal/configuration/configuration_mock.go @@ -48,6 +48,20 @@ func (mr *MockObserverMockRecorder) Init(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockObserver)(nil).Init), arg0) } +// String mocks base method. +func (m *MockObserver) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockObserverMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockObserver)(nil).String)) +} + // Update mocks base method. func (m *MockObserver) Update(arg0 models.DeviceConfigurationMessage) error { m.ctrl.T.Helper() diff --git a/internal/logs/workload.go b/internal/logs/workload.go index 7e4670d7..8e3b542a 100644 --- a/internal/logs/workload.go +++ b/internal/logs/workload.go @@ -205,3 +205,7 @@ func (w *WorkloadsLogsTarget) WorkloadStarted(workloadName string, report []*pod log.Error("Cannot start workload logs", err) } } + +func (w *WorkloadsLogsTarget) String() string { + return "workload logs" +} diff --git a/internal/metrics/remote_write.go b/internal/metrics/remote_write.go index fbffc9e5..877a0f97 100644 --- a/internal/metrics/remote_write.go +++ b/internal/metrics/remote_write.go @@ -533,3 +533,7 @@ func toTimeSeries(series []Series) (timeSeries []prompb.TimeSeries, lowest time. highest = fromDbTime(maxt) return } + +func (r *RemoteWrite) String() string { + return "remote write" +} diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index a65d9960..92af6506 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -84,6 +84,10 @@ func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podm } } +func (wrKM *WorkloadMetrics) String() string { + return "workload metrics" +} + func getWorkloadUrls(report *podman.PodReport, config *models.Workload) []string { res := []string{} metricsPath := config.Metrics.Path diff --git a/internal/mount/mount.go b/internal/mount/mount.go index ec156b47..6bf05b6e 100644 --- a/internal/mount/mount.go +++ b/internal/mount/mount.go @@ -170,3 +170,7 @@ func getDefaultMountOptions() string { } return fmt.Sprintf("gid=%s,uid=%s", group.Gid, usr.Uid) } + +func (m *Manager) String() string { + return "mount" +} diff --git a/internal/os/os.go b/internal/os/os.go index e19dbe7f..d87d3241 100644 --- a/internal/os/os.go +++ b/internal/os/os.go @@ -207,3 +207,7 @@ func (o *OS) updateGreenbootScripts() error { return nil } + +func (o *OS) String() string { + return "rpm-ostree" +} From 2420bdce0e22493f9b63217913ce08fcb54b23a4 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Wed, 3 Aug 2022 21:49:58 -0400 Subject: [PATCH 10/13] Process the error from stopping the service --- internal/workload/wrapper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 7102e5bc..a1225b5d 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -245,12 +245,12 @@ func (ww *Workload) removeService(workloadName string) error { // Disable the service from the system: if err := svc.Disable(); err != nil { - return fmt.Errorf("Cannot disable systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to disable systemd service for '%s': %s", workloadName, err) } // Remove the service from the system: if err := svc.Remove(); err != nil { - return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to remove systemd service for '%s': %s", workloadName, err) } err = ww.serviceManager.Remove(svc) From 99b0d60697be3b55f4e15a16e02fb338cc83a17f Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Thu, 4 Aug 2022 08:47:10 -0400 Subject: [PATCH 11/13] Remove service from systemd watch filter when dbus remove event is received to avoid potential race condition between file system event and dbus event --- internal/service/event_listener.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index e933bbda..bcee211d 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -136,7 +136,12 @@ func (e *DBusEventListener) Listen() { case running: log.Debugf("Sending start event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} - case removed, stop, dead, failed, start: + case removed: + // We remove the service from the filter here to avoid a potential race condition where the fs notify routine removes it before the systemd event is received. + log.Infof("Service %s disabled or removed", name) + e.Remove(name) + fallthrough + case stop, dead, failed, start: log.Debugf("Sending stop event to observer channel for workload %s", n) e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} default: @@ -165,11 +170,10 @@ func (e *DBusEventListener) watchFSEvents() { svcName := getServiceFileName(event.Name) switch { case event.Op&fsnotify.Create == fsnotify.Create: - log.Infof("New service configuration detected at %s", event.Name) + log.Infof("New systemd service unit file %s detected", event.Name) e.Add(svcName) case event.Op&fsnotify.Remove == fsnotify.Remove: - log.Infof("Service configuration removed at %s", event.Name) - e.Remove(svcName) + log.Debugf("Systemd service unit file %s has been removed. Waiting for the systemd dbus remove event to elimitate it from the service watch filter", event.Name) } case err, ok := <-e.fsWatcher.Errors: if !ok { From 79dcaf05645c81709acf2cb98077fe4f411f70f6 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Mon, 8 Aug 2022 15:16:52 -0400 Subject: [PATCH 12/13] Use workload from device config as source of truth for dbus service event filtering --- cmd/device-worker/main.go | 26 ++-- internal/configuration/configuration.go | 2 +- internal/service/event_listener.go | 172 +++++++++--------------- 3 files changed, 75 insertions(+), 125 deletions(-) diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index 086137c0..1387dfa1 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -34,17 +34,21 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var yggdDispatchSocketAddr string -var gracefulRebootChannel chan struct{} - const ( - defaultDataDir = "/var/local/yggdrasil" + defaultDataDir = "/var/local/yggdrasil" + systemdUserServicesDir = ".config/systemd/user" +) + +var ( + yggdDispatchSocketAddr string + gracefulRebootChannel chan struct{} + systemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) ) func initSystemdDirectory() error { // init the flotta user systemd units directory - err := os.MkdirAll(service.SystemdUserServicesFullPath, 0750) + err := os.MkdirAll(systemdUserServicesFullPath, 0750) if err != nil { return err } @@ -142,15 +146,11 @@ func main() { } configManager := configuration2.NewConfigurationManager(dataDir) - // Systemd event listener - listener := service.NewDBusEventListener() - eventCh, err := listener.Connect() - if err != nil { - log.Fatal(err) - } + // Systemd event dbusEventListener + dbusEventListener := service.NewDBusEventListener() // Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise // we risk in missing observers whilst processing the events. - go listener.Listen() + configManager.RegisterObserver(dbusEventListener) // --- Client metrics configuration --- metricsStore, err := metrics.NewTSDB(dataDir) @@ -173,7 +173,7 @@ func main() { dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon) configManager.RegisterObserver(dataTransferWatcher) - workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, eventCh) + workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, dbusEventListener.GetEventChannel()) if err != nil { log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err) } diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index 3964d4fd..b4a6056a 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -50,10 +50,10 @@ type Manager struct { func NewConfigurationManager(dataDir string) *Manager { deviceConfigFile := path.Join(dataDir, "device-config.json") log.Infof("device config file: %s", deviceConfigFile) - file, err := ioutil.ReadFile(deviceConfigFile) //#nosec var deviceConfiguration models.DeviceConfigurationMessage initialConfig := atomic.Value{} initialConfig.Store(false) + file, err := ioutil.ReadFile(deviceConfigFile) //#nosec if err != nil { log.Error(err) deviceConfiguration = defaultDeviceConfigurationMessage diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index bcee211d..fc155653 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -1,24 +1,22 @@ package service import ( - "errors" + "context" "fmt" - "os" "path/filepath" + "strconv" "strings" + "sync" "git.sr.ht/~spc/go-log" "github.com/coreos/go-systemd/v22/dbus" - "github.com/fsnotify/fsnotify" + "github.com/project-flotta/flotta-operator/models" ) const ( EventStarted EventType = "started" EventStopped EventType = "stopped" - systemdUserServicesDir = ".config/systemd/user" - defaultTargetWantsRelativePath = "default.target.wants/" - // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent // The only addition to the list is the "removed" case, which we use when a service is removed // from the system and we receive an empty unitStatus object. @@ -38,11 +36,6 @@ const ( removed unitSubState = "removed" ) -var ( - SystemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) - enabledServicesFullPath = filepath.Join(SystemdUserServicesFullPath, defaultTargetWantsRelativePath) -) - type unitSubState string type EventType string @@ -53,69 +46,59 @@ type Event struct { } type DBusEventListener struct { - observerCh chan *Event - set *dbus.SubscriptionSet - dbusCh <-chan map[string]*dbus.UnitStatus - dbusErrCh <-chan error - fsWatcher *fsnotify.Watcher + eventCh chan *Event + set *dbus.SubscriptionSet + dbusCh <-chan map[string]*dbus.UnitStatus + dbusErrCh <-chan error + lock sync.Mutex } func NewDBusEventListener() *DBusEventListener { - return &DBusEventListener{} + return &DBusEventListener{lock: sync.Mutex{}, eventCh: make(chan *Event, 1000)} } -func (e *DBusEventListener) Connect() (<-chan *Event, error) { +func (e *DBusEventListener) Init(configuration models.DeviceConfigurationMessage) error { + log.Infof("Starting DBus event listener") conn, err := newDbusConnection(UserBus) if err != nil { - return nil, err + return err } e.set = conn.NewSubscriptionSet() - if err := e.initializeSubscriptionSet(); err != nil { - return nil, err - } - e.observerCh = make(chan *Event, 1000) - return e.observerCh, nil -} - -func (e *DBusEventListener) initializeSubscriptionSet() error { - _, err := os.Stat(enabledServicesFullPath) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return e.watchServiceDirectory() + for _, w := range configuration.Workloads { + s, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(w.Name), "UnitFileState") + if err != nil { + return err } - return err - } - files, err := os.ReadDir(enabledServicesFullPath) - if err != nil { - return err - } - log.Infof("List of services to be monitored in %s:%s", enabledServicesFullPath, files) - for _, fd := range files { - f := filepath.Join(enabledServicesFullPath, fd.Name()) - log.Debugf("Detected service file %s", f) - if !fileExist(f) { - log.Errorf("Hard link %s does not exist or broken link", fd.Name()) - continue + log.Debugf("Unit UnitFileState property for workload %s:%s", w.Name, s.Value.String()) + v, err := strconv.Unquote(s.Value.String()) + if err != nil { + return err + } + if v == "disabled" { + log.Warnf("Service for workload %s is disabled", w.Name) } - e.Add(getServiceFileName(fd.Name())) + e.add(DefaultServiceName(w.Name)) } - return e.watchServiceDirectory() + go e.Listen() + return nil } -func (e *DBusEventListener) watchServiceDirectory() error { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return err +func (e *DBusEventListener) Update(configuration models.DeviceConfigurationMessage) error { + for _, wl := range configuration.Workloads { + svcName := DefaultServiceName(wl.Name) + if !e.contains(svcName) { + e.add(svcName) + } } - e.fsWatcher = watcher + return nil +} - _, err = os.Stat(SystemdUserServicesFullPath) - if err != nil { - return fmt.Errorf("systemd user directory not found: %s", err) - } +func (e *DBusEventListener) String() string { + return "DBus event listener" +} - go e.watchFSEvents() - return e.fsWatcher.Add(SystemdUserServicesFullPath) +func (e *DBusEventListener) GetEventChannel() <-chan *Event { + return e.eventCh } func (e *DBusEventListener) Listen() { @@ -135,15 +118,14 @@ func (e *DBusEventListener) Listen() { switch state { case running: log.Debugf("Sending start event to observer channel for workload %s", n) - e.observerCh <- &Event{WorkloadName: n, Type: EventStarted} + e.eventCh <- &Event{WorkloadName: n, Type: EventStarted} case removed: - // We remove the service from the filter here to avoid a potential race condition where the fs notify routine removes it before the systemd event is received. - log.Infof("Service %s disabled or removed", name) - e.Remove(name) + log.Debugf("Service %s has been removed", name) + e.remove(name) fallthrough case stop, dead, failed, start: log.Debugf("Sending stop event to observer channel for workload %s", n) - e.observerCh <- &Event{WorkloadName: n, Type: EventStopped} + e.eventCh <- &Event{WorkloadName: n, Type: EventStopped} default: log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) } @@ -155,52 +137,26 @@ func (e *DBusEventListener) Listen() { } -func (e *DBusEventListener) watchFSEvents() { - for { - select { - case event, ok := <-e.fsWatcher.Events: - if !ok { - log.Errorf("Error while watching for file system events at %s:%s", SystemdUserServicesFullPath, event) - continue - } - log.Debugf("Captured file system event:%s", event) - if !isAnEnabledService(event.Name) { - continue - } - svcName := getServiceFileName(event.Name) - switch { - case event.Op&fsnotify.Create == fsnotify.Create: - log.Infof("New systemd service unit file %s detected", event.Name) - e.Add(svcName) - case event.Op&fsnotify.Remove == fsnotify.Remove: - log.Debugf("Systemd service unit file %s has been removed. Waiting for the systemd dbus remove event to elimitate it from the service watch filter", event.Name) - } - case err, ok := <-e.fsWatcher.Errors: - if !ok { - log.Errorf("Error detected on file system watcher: %s", err) - return - } - } - } -} - -func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { - if unit == nil { - return removed - } - return unitSubState(unit.SubState) -} - -func (e *DBusEventListener) Add(serviceName string) { +func (e *DBusEventListener) add(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() log.Debugf("Adding service for event listener %s", serviceName) e.set.Add(serviceName) } -func (e *DBusEventListener) Remove(serviceName string) { +func (e *DBusEventListener) remove(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() log.Debugf("Removing service from event listener %s", serviceName) e.set.Remove(serviceName) } +func (e *DBusEventListener) contains(serviceName string) bool { + e.lock.Lock() + defer e.lock.Unlock() + return e.set.Contains(serviceName) +} + func extractWorkloadName(serviceName string) (string, error) { if filepath.Ext(serviceName) != ServiceSuffix { return "", fmt.Errorf("invalid file name or not a service %s", serviceName) @@ -208,15 +164,9 @@ func extractWorkloadName(serviceName string) (string, error) { return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil } -func fileExist(path string) bool { - _, err := os.Stat(path) - return err == nil -} - -func isAnEnabledService(fullPath string) bool { - return filepath.Ext(fullPath) == ServiceSuffix && filepath.Dir(fullPath) == enabledServicesFullPath -} -func getServiceFileName(fullPath string) string { - _, filename := filepath.Split(fullPath) - return filename +func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { + if unit == nil { + return removed + } + return unitSubState(unit.SubState) } From 84d1280896068de2269c411c969d923c0e8218e6 Mon Sep 17 00:00:00 2001 From: Jordi Gil Date: Tue, 9 Aug 2022 11:19:13 -0400 Subject: [PATCH 13/13] Removed logic to handle services.json and workload-mapping.json as mappings for running workloads * services.json: this file was used to list related services to the main service. Each related service manages a container in the pod spec. With the change we now extract this information from the BoundBy property in the main service. * workload-mapping.json: this file contains the mapping from the pod name and the pod ID. It was used to retrieve the pod ID based on the workload name and retrieve the pod report from podman. However, as it happens, the pod name equals to the workload name, and podman allows retrieval of the pod report with the pod ID or the pod name as argument, making this mapping no longer necessary. * Added missing error validation for starting and stopping services, including the validation whether the service exists or not before performing the action to avoid errors for "service not found". --- internal/datatransfer/monitor_test.go | 1 - internal/heartbeat/heartbeat_test.go | 1 - internal/metrics/system.go | 5 +- internal/service/event_listener.go | 22 +-- internal/service/mock_systemd.go | 15 ++ internal/service/mock_systemd_manager.go | 14 -- internal/service/systemd.go | 175 +++++++++-------------- internal/workload/manager.go | 33 ----- internal/workload/manager_test.go | 6 +- internal/workload/mock_wrapper.go | 56 -------- internal/workload/podman/podman.go | 55 +++---- internal/workload/wrapper.go | 150 +++++-------------- internal/workload/wrapper_test.go | 26 +--- 13 files changed, 157 insertions(+), 402 deletions(-) diff --git a/internal/datatransfer/monitor_test.go b/internal/datatransfer/monitor_test.go index 2243302b..314936d6 100644 --- a/internal/datatransfer/monitor_test.go +++ b/internal/datatransfer/monitor_test.go @@ -37,7 +37,6 @@ var _ = Describe("Datatransfer", func() { wkwMock = workload.NewMockWorkloadWrapper(mockCtrl) wkwMock.EXPECT().Init().Return(nil).AnyTimes() - wkwMock.EXPECT().PersistConfiguration().AnyTimes() wkManager, err = workload.NewWorkloadManagerWithParams(datadir, wkwMock, "device-id-123") Expect(err).NotTo(HaveOccurred(), "Cannot start the Workload Manager") diff --git a/internal/heartbeat/heartbeat_test.go b/internal/heartbeat/heartbeat_test.go index d07da33d..70da1415 100644 --- a/internal/heartbeat/heartbeat_test.go +++ b/internal/heartbeat/heartbeat_test.go @@ -55,7 +55,6 @@ var _ = Describe("Heartbeat", func() { mockCtrl = gomock.NewController(GinkgoT()) wkwMock = workload.NewMockWorkloadWrapper(mockCtrl) wkwMock.EXPECT().Init().Return(nil).AnyTimes() - wkwMock.EXPECT().PersistConfiguration().AnyTimes() regMock := registration.NewMockRegistrationWrapper(mockCtrl) wkManager, err = workload.NewWorkloadManagerWithParams(datadir, wkwMock, "device-id-123") diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 6a308c53..adb986ff 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,10 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus) - if err != nil { - return nil, err - } + nodeExporter := service.NewSystemd("node_exporter", nil, service.SystemBus) return NewSystemMetricsWithNodeExporter(daemon, nodeExporter), nil } diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go index fc155653..d5abe7e0 100644 --- a/internal/service/event_listener.go +++ b/internal/service/event_listener.go @@ -18,8 +18,8 @@ const ( EventStopped EventType = "stopped" // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent - // The only addition to the list is the "removed" case, which we use when a service is removed - // from the system and we receive an empty unitStatus object. + // The only addition to the list is the "removed" case, which we use when a service is disabled by removing the softlink + // from the file system. In this case we receive an empty unitStatus object. //The service is running running unitSubState = "running" @@ -32,7 +32,8 @@ const ( dead unitSubState = "dead" //The service failed to start failed unitSubState = "failed" - //The service has been removed, no unit state is returned from systemd + //The service has been disabled, probably because the soft link in the `default.target.wants` directory has been removed. + // No unit state is returned from systemd and thus we stop watching for this service. removed unitSubState = "removed" ) @@ -107,7 +108,7 @@ func (e *DBusEventListener) Listen() { select { case msg := <-e.dbusCh: for name, unit := range msg { - log.Debugf("Captured DBus event for %s: %v+", name, unit) + log.Tracef("Captured DBus event for %s: %v+", name, unit) n, err := extractWorkloadName(name) if err != nil { log.Error(err) @@ -117,17 +118,16 @@ func (e *DBusEventListener) Listen() { log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) switch state { case running: - log.Debugf("Sending start event to observer channel for workload %s", n) + log.Tracef("Sending start event to observer channel for workload %s", n) e.eventCh <- &Event{WorkloadName: n, Type: EventStarted} case removed: - log.Debugf("Service %s has been removed", name) + log.Tracef("Service %s has been disabled.", name) e.remove(name) - fallthrough case stop, dead, failed, start: - log.Debugf("Sending stop event to observer channel for workload %s", n) + log.Tracef("Sending stop event to observer channel for workload %s", n) e.eventCh <- &Event{WorkloadName: n, Type: EventStopped} default: - log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) + log.Tracef("Ignoring unit sub state for service %s: %s", name, state) } } case msg := <-e.dbusErrCh: @@ -140,14 +140,14 @@ func (e *DBusEventListener) Listen() { func (e *DBusEventListener) add(serviceName string) { e.lock.Lock() defer e.lock.Unlock() - log.Debugf("Adding service for event listener %s", serviceName) + log.Tracef("Adding service for event listener %s", serviceName) e.set.Add(serviceName) } func (e *DBusEventListener) remove(serviceName string) { e.lock.Lock() defer e.lock.Unlock() - log.Debugf("Removing service from event listener %s", serviceName) + log.Tracef("Removing service from event listener %s", serviceName) e.set.Remove(serviceName) } diff --git a/internal/service/mock_systemd.go b/internal/service/mock_systemd.go index 32e87772..850abd10 100644 --- a/internal/service/mock_systemd.go +++ b/internal/service/mock_systemd.go @@ -103,6 +103,21 @@ func (mr *MockServiceMockRecorder) Remove() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockService)(nil).Remove)) } +// ServiceExists mocks base method. +func (m *MockService) ServiceExists() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ServiceExists") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ServiceExists indicates an expected call of ServiceExists. +func (mr *MockServiceMockRecorder) ServiceExists() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ServiceExists", reflect.TypeOf((*MockService)(nil).ServiceExists)) +} + // Start mocks base method. func (m *MockService) Start() error { m.ctrl.T.Helper() diff --git a/internal/service/mock_systemd_manager.go b/internal/service/mock_systemd_manager.go index 04bafb52..786e022c 100644 --- a/internal/service/mock_systemd_manager.go +++ b/internal/service/mock_systemd_manager.go @@ -74,17 +74,3 @@ func (mr *MockSystemdManagerMockRecorder) Remove(arg0 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockSystemdManager)(nil).Remove), arg0) } - -// RemoveServicesFile mocks base method. -func (m *MockSystemdManager) RemoveServicesFile() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveServicesFile") - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveServicesFile indicates an expected call of RemoveServicesFile. -func (mr *MockSystemdManagerMockRecorder) RemoveServicesFile() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveServicesFile", reflect.TypeOf((*MockSystemdManager)(nil).RemoveServicesFile)) -} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index 7a8d68a0..7cb64ebe 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -2,13 +2,10 @@ package service import ( "context" - "encoding/json" "fmt" - "io/ioutil" "os" "path" "path/filepath" - "sync" "git.sr.ht/~spc/go-log" "github.com/coreos/go-systemd/v22/dbus" @@ -42,14 +39,14 @@ type Service interface { Stop() error Enable() error Disable() error + ServiceExists() (bool, error) } type systemd struct { - Name string `json:"name"` - Units []string `json:"units"` - UnitsContent map[string]string `json:"-"` - dbusConnection *dbus.Conn `json:"-"` - BusType BusType `json:"busType"` + name string + Units []string + UnitsContent map[string]string + BusType BusType } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -57,77 +54,6 @@ type SystemdManager interface { Add(svc Service) error Get(name string) Service Remove(svc Service) error - RemoveServicesFile() error -} - -type systemdManager struct { - svcFilePath string - lock sync.RWMutex - services map[string]Service -} - -func NewSystemdManager(configDir string) (SystemdManager, error) { - services := make(map[string]*systemd) - servicePath := path.Join(configDir, "services.json") - servicesJson, err := ioutil.ReadFile(servicePath) //#nosec - if err == nil { - err := json.Unmarshal(servicesJson, &services) - if err != nil { - return nil, fmt.Errorf("cannot unmarshal %v: %w", servicePath, err) - } - } - - systemdSVC := make(map[string]Service) - for k, v := range services { - systemdSVC[k] = v - } - - return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}}, nil -} - -func (mgr *systemdManager) RemoveServicesFile() error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - log.Infof("deleting %s file", mgr.svcFilePath) - err := os.RemoveAll(mgr.svcFilePath) - if err != nil { - log.Errorf("failed to delete %s: %v", mgr.svcFilePath, err) - return err - } - - return nil -} - -func (mgr *systemdManager) Add(svc Service) error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - mgr.services[svc.GetName()] = svc - return mgr.write() -} - -func (mgr *systemdManager) Get(name string) Service { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - - return mgr.services[name] -} - -func (mgr *systemdManager) Remove(svc Service) error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - delete(mgr.services, svc.GetName()) - return mgr.write() -} - -func (mgr *systemdManager) write() error { - svcJson, err := json.Marshal(mgr.services) - if err != nil { - return err - } - return ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } func newDbusConnection(busType BusType) (*dbus.Conn, error) { @@ -158,32 +84,25 @@ func newDbusConnection(busType BusType) (*dbus.Conn, error) { } } -func NewSystemd(name string, units map[string]string, busType BusType) (Service, error) { - var err error - var conn *dbus.Conn - - conn, err = newDbusConnection(busType) - if err != nil { - return nil, err - } - +func NewSystemd(name string, units map[string]string, busType BusType) Service { var unitNames []string for unit := range units { unitNames = append(unitNames, unit) } return &systemd{ - Name: name, - dbusConnection: conn, - Units: unitNames, - BusType: busType, - UnitsContent: units, - }, nil + name: name, + + Units: unitNames, + BusType: busType, + UnitsContent: units, + } + } func (s *systemd) Add() error { if len(s.UnitsContent) == 0 { - log.Infof("calling systemd add service for '%s' with no units available", s.Name) + log.Infof("calling systemd add service for '%s' with no units available", s.GetName()) } for unit, content := range s.UnitsContent { @@ -198,17 +117,39 @@ func (s *systemd) Add() error { } func (s *systemd) Remove() error { - for _, unit := range s.Units { - err := os.Remove(path.Join(DefaultUnitsPath, DefaultServiceName(unit))) + + conn, err := newDbusConnection(s.BusType) + if err != nil { + return err + } + // Retrieve dependency/bound services to the workload. It will return a slice with the services that are bound to this one + p, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(s.GetName()), "BoundBy") + if err != nil { + return err + } + log.Debugf("List of dependent services to %s: %s", s.GetName(), p) + v, ok := p.Value.Value().([]string) + if !ok { + return fmt.Errorf("invalid value %s for property BoundBy in service %s", p.Value.Value(), s.GetName()) + } + // Delete the files that are bound to this service + for _, unit := range v { + log.Tracef("Deleting service unit configuration at %s", path.Join(DefaultUnitsPath, unit)) + err := os.Remove(path.Join(DefaultUnitsPath, unit)) if err != nil { return err } } + // Finally, remove the service file + err = os.Remove(path.Join(DefaultUnitsPath, DefaultServiceName(s.GetName()))) + if err != nil { + return err + } return s.reload() } func (s *systemd) GetName() string { - return s.Name + return s.name } func (s *systemd) reload() error { @@ -221,14 +162,14 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { - log.Debugf("Starting service %s", s.Name) + log.Debugf("Starting service %s", s.GetName()) conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() startChan := make(chan string) - if _, err := conn.StartUnitContext(context.Background(), DefaultServiceName(s.Name), "replace", startChan); err != nil { + if _, err := conn.StartUnitContext(context.Background(), DefaultServiceName(s.GetName()), "replace", startChan); err != nil { return err } @@ -237,18 +178,19 @@ func (s *systemd) Start() error { case "done": return nil default: - return errors.Errorf("Failed[%s] to start systemd service %s", result, DefaultServiceName(s.Name)) + return errors.Errorf("Failed[%s] to start systemd service %s", result, DefaultServiceName(s.GetName())) } } func (s *systemd) Stop() error { + conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() stopChan := make(chan string) - if _, err := conn.StopUnitContext(context.Background(), DefaultServiceName(s.Name), "replace", stopChan); err != nil { + if _, err := conn.StopUnitContext(context.Background(), DefaultServiceName(s.GetName()), "replace", stopChan); err != nil { return err } @@ -257,33 +199,52 @@ func (s *systemd) Stop() error { case "done": return nil default: - return errors.Errorf("Failed[%s] to stop systemd service %s", result, DefaultServiceName(s.Name)) + return errors.Errorf("Failed[%s] to stop systemd service %s", result, DefaultServiceName(s.GetName())) } } func (s *systemd) Enable() error { - log.Debugf("Enabling service %s", s.Name) + log.Debugf("Enabling service %s", s.GetName()) conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() - _, _, err = conn.EnableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.Name)}, false, true) + _, _, err = conn.EnableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.GetName())}, false, true) return err } func (s *systemd) Disable() error { - log.Debugf("Disabling service %s", s.Name) + log.Debugf("Disabling service %s", s.GetName()) conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() - _, err = conn.DisableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.Name)}, false) + _, err = conn.DisableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.GetName())}, false) return err } func DefaultServiceName(serviceName string) string { return serviceName + ServiceSuffix } + +func (s *systemd) ServiceExists() (bool, error) { + conn, err := newDbusConnection(s.BusType) + if err != nil { + return false, err + } + defer conn.Close() + + units, err := conn.ListUnitsByNamesContext(context.Background(), []string{DefaultServiceName(s.GetName())}) + if err != nil { + return false, err + } + log.Tracef("Number of matching units %d:%+v", len(units), units) + exists := len(units) == 1 && units[0].LoadState != "not-found" + if !exists { + log.Tracef("Service %s not found ", s.GetName()) + } + return exists, nil +} diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 12f6a47c..6766f81d 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -353,18 +353,6 @@ func (w *WorkloadManager) Deregister() error { log.Errorf("failed to delete volumes directory. DeviceID: %s; err: %v", w.deviceId, err) } - err = w.removeMappingFile() - if err != nil { - errors = multierror.Append(errors, fmt.Errorf("failed to remove mapping file: %v", err)) - log.Errorf("failed to remove mapping file. DeviceID: %s; err: %v", w.deviceId, err) - } - - err = w.removeServicesFile() - if err != nil { - errors = multierror.Append(errors, fmt.Errorf("failed to remove services file: %v", err)) - log.Errorf("failed to remove services file. DeviceID: %s; err: %v", w.deviceId, err) - } - w.deregistered = true return errors } @@ -449,27 +437,6 @@ func (w *WorkloadManager) deleteTable() error { return nil } -func (w *WorkloadManager) removeServicesFile() error { - log.Infof("deleting services file. DeviceID: %s;", w.deviceId) - err := w.workloads.RemoveServicesFile() - if err != nil { - return err - } - - return nil -} - -func (w *WorkloadManager) removeMappingFile() error { - log.Infof("deleting mapping file. DeviceID: %s;", w.deviceId) - err := w.workloads.RemoveMappingFile() - if err != nil { - log.Error(err) - return err - } - - return nil -} - func (w *WorkloadManager) toPod(workload *models.Workload) (*v1.Pod, error) { podSpec := v1.PodSpec{} err := yaml.Unmarshal([]byte(workload.Specification), &podSpec) diff --git a/internal/workload/manager_test.go b/internal/workload/manager_test.go index 221a612f..4a0ad300 100644 --- a/internal/workload/manager_test.go +++ b/internal/workload/manager_test.go @@ -92,16 +92,14 @@ var _ = Describe("Events", func() { wkwMock.EXPECT().List().Return([]api.WorkloadInfo{ {Id: "stale", Name: "stale", Status: "created"}, }, nil).AnyTimes() - wkwMock.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("Failed to start container")) - wkwMock.EXPECT().PersistConfiguration().AnyTimes() - wkwMock.EXPECT().Start(gomock.Any()).Return(fmt.Errorf("failed to start container")).AnyTimes() + wkwMock.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Failed to start container")) // when err := wkManager.Update(cfg) // then Expect(err).To(HaveOccurred()) - + Expect(err.Error()).To(ContainSubstring("Failed to start container")) // Check no events are generated: time.Sleep(5 * time.Second) events := wkManager.PopEvents() diff --git a/internal/workload/mock_wrapper.go b/internal/workload/mock_wrapper.go index b3a163ed..7ea6cf48 100644 --- a/internal/workload/mock_wrapper.go +++ b/internal/workload/mock_wrapper.go @@ -122,20 +122,6 @@ func (mr *MockWorkloadWrapperMockRecorder) Logs(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logs", reflect.TypeOf((*MockWorkloadWrapper)(nil).Logs), arg0, arg1) } -// PersistConfiguration mocks base method. -func (m *MockWorkloadWrapper) PersistConfiguration() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PersistConfiguration") - ret0, _ := ret[0].(error) - return ret0 -} - -// PersistConfiguration indicates an expected call of PersistConfiguration. -func (mr *MockWorkloadWrapperMockRecorder) PersistConfiguration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConfiguration", reflect.TypeOf((*MockWorkloadWrapper)(nil).PersistConfiguration)) -} - // RegisterObserver mocks base method. func (m *MockWorkloadWrapper) RegisterObserver(arg0 Observer) { m.ctrl.T.Helper() @@ -162,20 +148,6 @@ func (mr *MockWorkloadWrapperMockRecorder) Remove(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockWorkloadWrapper)(nil).Remove), arg0) } -// RemoveMappingFile mocks base method. -func (m *MockWorkloadWrapper) RemoveMappingFile() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveMappingFile") - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveMappingFile indicates an expected call of RemoveMappingFile. -func (mr *MockWorkloadWrapperMockRecorder) RemoveMappingFile() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveMappingFile", reflect.TypeOf((*MockWorkloadWrapper)(nil).RemoveMappingFile)) -} - // RemoveSecret mocks base method. func (m *MockWorkloadWrapper) RemoveSecret(arg0 string) error { m.ctrl.T.Helper() @@ -190,20 +162,6 @@ func (mr *MockWorkloadWrapperMockRecorder) RemoveSecret(arg0 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveSecret", reflect.TypeOf((*MockWorkloadWrapper)(nil).RemoveSecret), arg0) } -// RemoveServicesFile mocks base method. -func (m *MockWorkloadWrapper) RemoveServicesFile() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveServicesFile") - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveServicesFile indicates an expected call of RemoveServicesFile. -func (mr *MockWorkloadWrapperMockRecorder) RemoveServicesFile() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveServicesFile", reflect.TypeOf((*MockWorkloadWrapper)(nil).RemoveServicesFile)) -} - // RemoveTable mocks base method. func (m *MockWorkloadWrapper) RemoveTable() error { m.ctrl.T.Helper() @@ -232,20 +190,6 @@ func (mr *MockWorkloadWrapperMockRecorder) Run(arg0, arg1, arg2, arg3 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockWorkloadWrapper)(nil).Run), arg0, arg1, arg2, arg3) } -// Start mocks base method. -func (m *MockWorkloadWrapper) Start(arg0 *v1.Pod) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Start", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Start indicates an expected call of Start. -func (mr *MockWorkloadWrapperMockRecorder) Start(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockWorkloadWrapper)(nil).Start), arg0) -} - // Stop mocks base method. func (m *MockWorkloadWrapper) Stop(arg0 string) error { m.ctrl.T.Helper() diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index 58a7d2ef..d40cda3c 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -169,22 +169,22 @@ func (p *podman) List() ([]api.WorkloadInfo, error) { return workloads, nil } -func (p *podman) Exists(workloadId string) (bool, error) { - exists, err := pods.Exists(p.podmanConnection, workloadId, nil) +func (p *podman) Exists(workloadName string) (bool, error) { + exists, err := pods.Exists(p.podmanConnection, workloadName, nil) if err != nil { return false, err } return exists, nil } -func (p *podman) Remove(workloadId string) error { - exists, err := p.Exists(workloadId) +func (p *podman) Remove(workloadName string) error { + exists, err := p.Exists(workloadName) if err != nil { return err } if exists { force := true - _, err := pods.Remove(p.podmanConnection, workloadId, &pods.RemoveOptions{Force: &force}) + _, err := pods.Remove(p.podmanConnection, workloadName, &pods.RemoveOptions{Force: &force}) if err != nil { return err } @@ -272,12 +272,9 @@ func (p *podman) Run(manifestPath, authFilePath string, annotations map[string]s return podIds, nil } -func (p *podman) Start(workloadId string) error { - _, err := pods.Start(p.podmanConnection, workloadId, nil) - if err != nil { - return err - } - return nil +func (p *podman) Start(workloadName string) error { + _, err := pods.Start(p.podmanConnection, workloadName, nil) + return err } func (p *podman) ListSecrets() (map[string]struct{}, error) { @@ -320,7 +317,6 @@ func hasAutoUpdateEnabled(labels map[string]string) bool { } func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, monitoringInterval uint) (service.Service, error) { - var svc service.Service podName := workload.Name // Since podman don't support generation of the systemd services that re-creates the pod, instead of restarting it, @@ -336,31 +332,22 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m if err != nil { return nil, err } + return service.NewSystemd(podName, report.Units, service.UserBus), nil + } - svc, err = service.NewSystemd(podName, report.Units, service.UserBus) - if err != nil { - return nil, err - } - } else { - var unit bytes.Buffer + var unit bytes.Buffer - tmp := template.New("unit") - t, err := tmp.Parse(autoUpdateServiceUnitTemplate) - if err != nil { - return nil, err - } - err = t.Execute(&unit, AutoUpdateUnit{ManifestPath: manifestPath, PodmanBinary: podmanBinary, PodName: podName}) - if err != nil { - return nil, err - } - units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units, service.UserBus) - if err != nil { - return nil, err - } + tmp := template.New("unit") + t, err := tmp.Parse(autoUpdateServiceUnitTemplate) + if err != nil { + return nil, err } - - return svc, nil + err = t.Execute(&unit, AutoUpdateUnit{ManifestPath: manifestPath, PodmanBinary: podmanBinary, PodName: podName}) + if err != nil { + return nil, err + } + units := map[string]string{podName: unit.String()} + return service.NewSystemd(podName, units, service.UserBus), nil } // Retrieve all pods logs and send that to the given io.Writer diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index a1225b5d..c4c0f913 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -11,7 +11,6 @@ import ( "git.sr.ht/~spc/go-log" "github.com/project-flotta/flotta-device-worker/internal/workload/api" - "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" v1 "k8s.io/api/core/v1" @@ -35,11 +34,7 @@ type WorkloadWrapper interface { Remove(string) error Stop(string) error Run(*v1.Pod, string, string, map[string]string) error - Start(*v1.Pod) error - PersistConfiguration() error RemoveTable() error - RemoveMappingFile() error - RemoveServicesFile() error ListSecrets() (map[string]struct{}, error) RemoveSecret(string) error CreateSecret(string, string) error @@ -51,20 +46,16 @@ type WorkloadWrapper interface { type Workload struct { podManager podman.Podman netfilter network.Netfilter - mappingRepository mapping.MappingRepository observers []Observer - serviceManager service.SystemdManager monitoringInterval uint lock sync.RWMutex systemdEventCh <-chan *service.Event } -func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint, systemdEventCh <-chan *service.Event) *Workload { +func NewWorkload(p podman.Podman, n network.Netfilter, monitoringInterval uint, systemdEventCh <-chan *service.Event) *Workload { return &Workload{ podManager: p, netfilter: n, - mappingRepository: m, - serviceManager: s, monitoringInterval: monitoringInterval, systemdEventCh: systemdEventCh, } @@ -79,21 +70,10 @@ func newWorkloadInstance(configDir string, monitoringInterval uint, systemdEvent if err != nil { return nil, fmt.Errorf("workload cannot initialize netfilter manager: %w", err) } - mappingRepository, err := mapping.NewMappingRepository(configDir) - if err != nil { - return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) - } - - serviceManager, err := service.NewSystemdManager(configDir) - if err != nil { - return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) - } ww := &Workload{ podManager: newPodman, netfilter: netfilter, - mappingRepository: mappingRepository, - serviceManager: serviceManager, monitoringInterval: monitoringInterval, systemdEventCh: systemdEventCh, } @@ -109,10 +89,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus) - if err != nil { - return err - } + svc := service.NewSystemd("podman-auto-update", nil, service.UserBus) if err := svc.Start(); err != nil { return err } @@ -122,17 +99,7 @@ func (ww *Workload) Init() error { } func (ww *Workload) List() ([]api.WorkloadInfo, error) { - infos, err := ww.podManager.List() - if err != nil { - return nil, err - } - for i := range infos { - mappedName := ww.mappingRepository.GetName(infos[i].Id) - if mappedName != "" { - infos[i].Name = mappedName - } - } - return infos, err + return ww.podManager.List() } func (ww *Workload) Logs(podID string, res io.Writer) (context.CancelFunc, error) { @@ -140,33 +107,25 @@ func (ww *Workload) Logs(podID string, res io.Writer) (context.CancelFunc, error } func (ww *Workload) Remove(workloadName string) error { - id := ww.mappingRepository.GetId(workloadName) - if id == "" { - id = workloadName - } // Remove the service configuration from the system: if err := ww.removeService(workloadName); err != nil { return err } - if err := ww.podManager.Remove(id); err != nil { + if err := ww.podManager.Remove(workloadName); err != nil { return err } if err := ww.netfilter.DeleteChain(nfTableName, workloadName); err != nil { log.Errorf("failed to delete chain '%[1]s' from table '%[2]s' for workload '%[1]s': %[3]v", workloadName, nfTableName, err) } - if err := ww.mappingRepository.Remove(workloadName); err != nil { - return err - } + return nil } func (ww *Workload) Stop(workloadName string) error { - id := ww.mappingRepository.GetId(workloadName) - if id == "" { - id = workloadName - } + id := workloadName + err := ww.podManager.Stop(id) return err } @@ -180,72 +139,55 @@ func (ww *Workload) RemoveTable() error { return nil } -func (ww *Workload) RemoveServicesFile() error { - log.Infof("deleting services file") - if err := ww.serviceManager.RemoveServicesFile(); err != nil { - log.Errorf("failed to remove services file: %v", err) - return err - } - return nil -} - -func (ww *Workload) RemoveMappingFile() error { - log.Infof("deleting mapping file") - if err := ww.mappingRepository.RemoveMappingFile(); err != nil { - log.Errorf("failed to remove mapping file: %v", err) - return err - } - return nil -} - func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath string, podmanAnnotations map[string]string) error { if err := ww.applyNetworkConfiguration(workload); err != nil { return err } - podIds, err := ww.podManager.Run(manifestPath, authFilePath, podmanAnnotations) + _, err := ww.podManager.Run(manifestPath, authFilePath, podmanAnnotations) if err != nil { return err } - // Must be called before GenerateSystemdService: - if err = ww.mappingRepository.Add(workload.Name, podIds[0].Id); err != nil { - return err - } - // Create the system service to manage the pod: svc, err := ww.podManager.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) if err != nil { return fmt.Errorf("error while generating systemd service: %v", err) } - log.Infof("Creating service for %s", workload.Name) - err = ww.createService(svc) - if err != nil { - return fmt.Errorf("error while starting service: %v", err) + log.Infof("Starting service for %s", workload.Name) + if err := svc.Add(); err != nil { + return fmt.Errorf("cannot add systemd service '%s': %v", svc.GetName(), err) } - log.Infof("Registering service %s", workload.Name) - err = ww.serviceManager.Add(svc) + if err := svc.Enable(); err != nil { + return fmt.Errorf("cannot enable systemd service '%s': %v", svc.GetName(), err) + } + + err = svc.Start() if err != nil { - return fmt.Errorf("error while updating service manager: %v", err) + log.Errorf("cannot start systemd service '%s': %v", svc.GetName(), err) + return err } + return nil } func (ww *Workload) removeService(workloadName string) error { - svc := ww.serviceManager.Get(workloadName) - if svc == nil { + svc := service.NewSystemd(workloadName, nil, service.UserBus) + exists, err := svc.ServiceExists() + if err != nil { + return err + } + if !exists { return nil } - - // Ignore stop failure: - err := svc.Stop() + err = svc.Stop() if err != nil { return fmt.Errorf("unable to stop service %s:%s", workloadName, err) } // Disable the service from the system: if err := svc.Disable(); err != nil { - return fmt.Errorf("unable to disable systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to disable systemd service for '%s': %+v", workloadName, err) } // Remove the service from the system: @@ -253,28 +195,6 @@ func (ww *Workload) removeService(workloadName string) error { return fmt.Errorf("unable to remove systemd service for '%s': %s", workloadName, err) } - err = ww.serviceManager.Remove(svc) - if err != nil { - log.Errorf("Unable to remove service from serviceManager %s:%s", workloadName, err) - } - return nil -} - -func (ww *Workload) createService(svc service.Service) error { - if err := svc.Add(); err != nil { - return fmt.Errorf("cannot add systemd service '%s': %v", svc.GetName(), err) - } - if err := svc.Enable(); err != nil { - return fmt.Errorf("cannot enable systemd service '%s': %v", svc.GetName(), err) - } - - err := svc.Start() - if err != nil { - // A service maybe is already in place, and started, so no returning an - // error here. - log.Errorf("cannot start systemd service '%s': %v", svc.GetName(), err) - } - return nil } @@ -303,20 +223,14 @@ func (ww *Workload) applyNetworkConfiguration(workload *v1.Pod) error { } func (ww *Workload) Start(workload *v1.Pod) error { - _ = ww.netfilter.DeleteChain(nfTableName, workload.Name) - if err := ww.applyNetworkConfiguration(workload); err != nil { - return err + err := ww.netfilter.DeleteChain(nfTableName, workload.Name) + if err != nil { + log.Errorf("Error detected while deleting chain for workload %s:%s", workload.Name, err) } - - podId := ww.mappingRepository.GetId(workload.Name) - if err := ww.podManager.Start(podId); err != nil { + if err = ww.applyNetworkConfiguration(workload); err != nil { return err } - return nil -} - -func (ww *Workload) PersistConfiguration() error { - return ww.mappingRepository.Persist() + return ww.podManager.Start(workload.Name) } func (ww *Workload) ListSecrets() (map[string]struct{}, error) { diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index 1cfa60e8..e5f63d06 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -8,7 +8,6 @@ import ( . "github.com/onsi/gomega" "github.com/project-flotta/flotta-device-worker/internal/service" "github.com/project-flotta/flotta-device-worker/internal/workload" - "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" v1 "k8s.io/api/core/v1" @@ -23,24 +22,21 @@ const ( var _ = Describe("Workload management", func() { var ( - mockCtrl *gomock.Controller - wk *workload.Workload - newPodman *podman.MockPodman - netfilter *network.MockNetfilter - mappingRepository *mapping.MockMappingRepository - serviceManager *service.MockSystemdManager - svc *service.MockService + mockCtrl *gomock.Controller + wk *workload.Workload + newPodman *podman.MockPodman + netfilter *network.MockNetfilter + + svc *service.MockService ) BeforeEach(func() { mockCtrl = gomock.NewController(GinkgoT()) newPodman = podman.NewMockPodman(mockCtrl) netfilter = network.NewMockNetfilter(mockCtrl) - mappingRepository = mapping.NewMockMappingRepository(mockCtrl) - serviceManager = service.NewMockSystemdManager(mockCtrl) svc = service.NewMockService(mockCtrl) - wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15, nil) + wk = workload.NewWorkload(newPodman, netfilter, 15, nil) }) AfterEach(func() { @@ -61,10 +57,6 @@ var _ = Describe("Workload management", func() { svc.EXPECT().Enable().Return(nil) svc.EXPECT().Start().Return(nil) - serviceManager.EXPECT().Add(svc).Return(nil) - - mappingRepository.EXPECT().Add("pod1", "id1") - // when err := wk.Run(pod, manifestPath, authFilePath, nil) @@ -77,8 +69,6 @@ var _ = Describe("Workload management", func() { // given pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}} - mappingRepository.EXPECT().Add("pod1", "id1") - newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) @@ -97,8 +87,6 @@ var _ = Describe("Workload management", func() { // given pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}} - mappingRepository.EXPECT().Add("pod1", "id1") - newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil)