diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index 1bce48e8..4332b637 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -20,7 +20,8 @@ import ( os2 "github.com/project-flotta/flotta-device-worker/internal/os" registration2 "github.com/project-flotta/flotta-device-worker/internal/registration" "github.com/project-flotta/flotta-device-worker/internal/server" - workload2 "github.com/project-flotta/flotta-device-worker/internal/workload" + "github.com/project-flotta/flotta-device-worker/internal/service" + "github.com/project-flotta/flotta-device-worker/internal/workload" "net" "os" @@ -33,17 +34,21 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var yggdDispatchSocketAddr string -var gracefulRebootChannel chan struct{} - const ( - defaultDataDir = "/var/local/yggdrasil" + defaultDataDir = "/var/local/yggdrasil" + systemdUserServicesDir = ".config/systemd/user" +) + +var ( + yggdDispatchSocketAddr string + gracefulRebootChannel chan struct{} + systemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) ) func initSystemdDirectory() error { - systemddir := filepath.Join(os.Getenv("HOME"), ".config/systemd/user/") + // init the flotta user systemd units directory - err := os.MkdirAll(systemddir, 0750) + err := os.MkdirAll(systemdUserServicesFullPath, 0750) if err != nil { return err } @@ -149,6 +154,12 @@ func main() { } configManager := configuration2.NewConfigurationManager(dataDir) + // Systemd event dbusEventListener + dbusEventListener := service.NewDBusEventListener() + // Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise + // we risk in missing observers whilst processing the events. + configManager.RegisterObserver(dbusEventListener) + // --- Client metrics configuration --- metricsStore, err := metrics.NewTSDB(dataDir) if err != nil { @@ -170,16 +181,16 @@ func main() { dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon) configManager.RegisterObserver(dataTransferWatcher) - wl, err := workload2.NewWorkloadManager(dataDir, deviceId) + workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, dbusEventListener.GetEventChannel()) if err != nil { log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err) } - logsWrapper := logs.NewWorkloadsLogsTarget(wl) + logsWrapper := logs.NewWorkloadsLogsTarget(workloadManager) configManager.RegisterObserver(logsWrapper) - wl.RegisterObserver(logsWrapper) - configManager.RegisterObserver(wl) - wl.RegisterObserver(workloadMetricWatcher) + workloadManager.RegisterObserver(logsWrapper) + configManager.RegisterObserver(workloadManager) + workloadManager.RegisterObserver(workloadMetricWatcher) remoteWrite := metrics.NewRemoteWrite(dataDir, deviceId, metricsStore) configManager.RegisterObserver(remoteWrite) @@ -197,25 +208,21 @@ func main() { } configManager.RegisterObserver(mountManager) - dataMonitor := datatransfer.NewMonitor(wl, configManager) - wl.RegisterObserver(dataMonitor) + dataMonitor := datatransfer.NewMonitor(workloadManager, configManager) + workloadManager.RegisterObserver(dataMonitor) configManager.RegisterObserver(dataMonitor) dataMonitor.Start() - if err != nil { - log.Fatalf("cannot start metrics store. DeviceID: %s; err: %v", deviceId, err) - } - - reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, wl) + reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, workloadManager) if err != nil { log.Fatalf("cannot start registration process: DeviceID: %s; err: %v", deviceId, err) } - hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, wl, &hw, dataMonitor, deviceOs, reg) + hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, workloadManager, &hw, dataMonitor, deviceOs, reg) configManager.RegisterObserver(hbs) reg.DeregisterLater( - wl, + workloadManager, configManager, hbs, dataMonitor, @@ -247,16 +254,19 @@ func main() { setupSignalHandler(metricsStore, ansibleManager) - go listenStartGracefulRebootChannel(wl, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, + go listenStartGracefulRebootChannel(workloadManager, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, gracefulRebootChannel, deviceOs) + // All observers have been registered at this point. Ready to start listening to workload service events generated by changes to the systemd dbus + go workloadManager.ListenServiceEvents() + if err := s.Serve(l); err != nil { log.Fatalf("cannot start worker server, err: %v", err) } } -func listenStartGracefulRebootChannel(wl *workload2.WorkloadManager, dataMonitor *datatransfer.Monitor, +func listenStartGracefulRebootChannel(wl *workload.WorkloadManager, dataMonitor *datatransfer.Monitor, systemMetricsWatcher *metrics.SystemMetrics, metricsStore *metrics.TSDB, hbs *heartbeat2.Heartbeat, ansibleManager *ansible.Manager, gracefulRebootChannel chan struct{}, deviceOs *os2.OS) { // listen to the channel for getting StartGracefulReboot signal diff --git a/go.mod b/go.mod index 7090ca11..1e091642 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/digitalocean/godo v1.80.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect + github.com/fsnotify/fsnotify v1.5.4 github.com/go-kit/log v0.2.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/runtime v0.23.1 // indirect diff --git a/internal/ansible/ansible_manager_test.go b/internal/ansible/ansible_manager_test.go index 2f6077db..9a6fd60a 100644 --- a/internal/ansible/ansible_manager_test.go +++ b/internal/ansible/ansible_manager_test.go @@ -164,8 +164,10 @@ var _ = Describe("Ansible Runner", func() { modTime1 := time.Now().Add(-3 * time.Hour) modTime2 := time.Now().Add(-2 * time.Hour) - p1Sha := ansibleManager.MappingRepository.GetSha256(p1) - p2Sha := ansibleManager.MappingRepository.GetSha256(p2) + p1Sha, err := ansibleManager.MappingRepository.GetSha256(p1) + Expect(err).ToNot(HaveOccurred()) + p2Sha, err := ansibleManager.MappingRepository.GetSha256(p2) + Expect(err).ToNot(HaveOccurred()) p1Path := path.Join(configDir, p1Sha) p2Path := path.Join(configDir, p2Sha) diff --git a/internal/ansible/mapping/mapping.go b/internal/ansible/mapping/mapping.go index d242042b..c3ebdf3f 100644 --- a/internal/ansible/mapping/mapping.go +++ b/internal/ansible/mapping/mapping.go @@ -20,7 +20,7 @@ type mapping struct { //go:generate mockgen -package=mapping -destination=mock_mapping.go . MappingRepository type MappingRepository interface { - GetSha256(fileContent []byte) string + GetSha256(fileContent []byte) (string, error) Add(fileContent []byte, modTime time.Time) error Remove(fileContent []byte) error RemoveMappingFile() error @@ -85,17 +85,24 @@ func (m *mappingRepository) GetAll() map[int]string { return all } -func (m *mappingRepository) GetSha256(fileContent []byte) string { +func (m *mappingRepository) GetSha256(fileContent []byte) (string, error) { h := sha256.New() - h.Write(fileContent) - return fmt.Sprintf("%x", h.Sum(nil)) + _, err := h.Write(fileContent) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", h.Sum(nil)), nil } func (m *mappingRepository) Add(fileContent []byte, modTime time.Time) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) - err := os.WriteFile(filePath, []byte(fileContent), 0600) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) + err = os.WriteFile(filePath, []byte(fileContent), 0600) if err != nil { return err @@ -111,7 +118,11 @@ func (m *mappingRepository) Remove(fileContent []byte) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) modTime := m.pathToModTime[filePath] delete(m.modTimeToPath, modTime) delete(m.pathToModTime, filePath) diff --git a/internal/ansible/mapping/mapping_test.go b/internal/ansible/mapping/mapping_test.go index 37b77207..ea6e2526 100644 --- a/internal/ansible/mapping/mapping_test.go +++ b/internal/ansible/mapping/mapping_test.go @@ -23,7 +23,8 @@ var _ = Describe("Mapping", func() { repo, err = mapping.NewMappingRepository(dir) Expect(err).ToNot(HaveOccurred()) - sha256Test = repo.GetSha256([]byte("test")) + sha256Test, err = repo.GetSha256([]byte("test")) + Expect(err).ToNot(HaveOccurred()) filePathTest = path.Join(configDir, sha256Test) }) AfterEach(func() { @@ -31,8 +32,10 @@ var _ = Describe("Mapping", func() { Expect(err).ToNot(HaveOccurred()) }) It("sha256 Generation", func() { - s1 := repo.GetSha256([]byte("AAA")) - s2 := repo.GetSha256([]byte("AAA")) + s1, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) + s2, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) Expect(s1).To(Equal(s2)) }) It("Should be created empty", func() { @@ -86,12 +89,17 @@ var _ = Describe("Mapping", func() { It("Should persist mappings", func() { // given - filePath1 := path.Join(configDir, repo.GetSha256([]byte("test-one"))) - filePath2 := path.Join(configDir, repo.GetSha256([]byte("test-two"))) + sha, err := repo.GetSha256([]byte("test-one")) + Expect(err).ToNot(HaveOccurred()) + filePath1 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) + sha, err = repo.GetSha256([]byte("test-two")) + filePath2 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) modTime1 := time.Now() modTime2 := modTime1.Add(1 * time.Minute) - err := repo.Add([]byte("test-one"), modTime1) + err = repo.Add([]byte("test-one"), modTime1) Expect(err).ToNot(HaveOccurred()) Expect(repo.GetModTime(filePath1)).To(Equal(modTime1.UnixNano())) Expect(repo.GetFilePath(modTime1)).To(Equal(filePath1)) diff --git a/internal/ansible/mapping/mock_mapping.go b/internal/ansible/mapping/mock_mapping.go index fd3ead81..6a05f38b 100644 --- a/internal/ansible/mapping/mock_mapping.go +++ b/internal/ansible/mapping/mock_mapping.go @@ -105,11 +105,12 @@ func (mr *MockMappingRepositoryMockRecorder) GetModTime(arg0 interface{}) *gomoc } // GetSha256 mocks base method. -func (m *MockMappingRepository) GetSha256(arg0 []byte) string { +func (m *MockMappingRepository) GetSha256(arg0 []byte) (string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSha256", arg0) ret0, _ := ret[0].(string) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // GetSha256 indicates an expected call of GetSha256. diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index ca35792a..747e8bd2 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -34,6 +34,7 @@ var ( type Observer interface { Init(configuration models.DeviceConfigurationMessage) error Update(configuration models.DeviceConfigurationMessage) error + String() string } type Manager struct { @@ -48,10 +49,10 @@ type Manager struct { func NewConfigurationManager(dataDir string) *Manager { deviceConfigFile := path.Join(dataDir, "device-config.json") log.Infof("device config file: %s", deviceConfigFile) - file, err := os.ReadFile(deviceConfigFile) //#nosec var deviceConfiguration models.DeviceConfigurationMessage initialConfig := atomic.Value{} initialConfig.Store(false) + file, err := os.ReadFile(deviceConfigFile) //#nosec if err != nil { log.Error(err) deviceConfiguration = defaultDeviceConfigurationMessage @@ -146,6 +147,7 @@ func (m *Manager) Update(message models.DeviceConfigurationMessage) error { } log.Infof("updating configuration. New config: %s\nOld config: %s", newJson, oldJson) + log.Debugf("[ConfigManager] observers :%v+", m.observers) for _, observer := range m.observers { err := observer.Update(message) if err != nil { diff --git a/internal/configuration/configuration_mock.go b/internal/configuration/configuration_mock.go index 6bb849e9..987e371e 100644 --- a/internal/configuration/configuration_mock.go +++ b/internal/configuration/configuration_mock.go @@ -48,6 +48,20 @@ func (mr *MockObserverMockRecorder) Init(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockObserver)(nil).Init), arg0) } +// String mocks base method. +func (m *MockObserver) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockObserverMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockObserver)(nil).String)) +} + // Update mocks base method. func (m *MockObserver) Update(arg0 models.DeviceConfigurationMessage) error { m.ctrl.T.Helper() diff --git a/internal/heartbeat/heartbeat_test.go b/internal/heartbeat/heartbeat_test.go index 0464c730..1b3163cf 100644 --- a/internal/heartbeat/heartbeat_test.go +++ b/internal/heartbeat/heartbeat_test.go @@ -6,13 +6,11 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "reflect" "sync" "time" - "github.com/openshift/assisted-installer-agent/src/util" "github.com/project-flotta/flotta-device-worker/internal/ansible" os2 "github.com/project-flotta/flotta-device-worker/internal/os" "github.com/project-flotta/flotta-device-worker/internal/registration" @@ -759,44 +757,6 @@ func (d *DispatcherFailing) GetConfig(ctx context.Context, in *pb.Empty, opts .. return nil, nil } -func NewFilledInterfaceMock(mtu int, name string, macAddr string, flags net.Flags, addrs []string, isPhysical bool, isBonding bool, isVlan bool, speedMbps int64) *util.MockInterface { - hwAddr, _ := net.ParseMAC(macAddr) - ret := util.MockInterface{} - ret.On("IsPhysical").Return(isPhysical) - if isPhysical || isBonding || isVlan { - ret.On("Name").Return(name) - ret.On("MTU").Return(mtu) - ret.On("HardwareAddr").Return(hwAddr) - ret.On("Flags").Return(flags) - ret.On("Addrs").Return(toAddresses(addrs), nil) - ret.On("SpeedMbps").Return(speedMbps) - } - if !isPhysical { - ret.On("IsBonding").Return(isBonding) - } - if !(isPhysical || isBonding) { - ret.On("IsVlan").Return(isVlan) - } - - return &ret -} - -func toAddresses(addrs []string) []net.Addr { - ret := make([]net.Addr, 0) - for _, a := range addrs { - ret = append(ret, str2Addr(a)) - } - return ret -} - -func str2Addr(addrStr string) net.Addr { - ip, ipnet, err := net.ParseCIDR(addrStr) - if err != nil { - return &net.IPNet{} - } - return &net.IPNet{IP: ip, Mask: ipnet.Mask} -} - func initHwMock(hwMock *hardware.MockHardware, configManager *configuration.Manager, hostname string, ipv4 []string) (*gomock.Call, *gomock.Call, *gomock.Call) { var m models.HardwareInfo configManager.GetDeviceConfiguration().Heartbeat.HardwareProfile.Scope = heartbeat.ScopeDelta diff --git a/internal/logs/workload.go b/internal/logs/workload.go index f343877b..101e01ed 100644 --- a/internal/logs/workload.go +++ b/internal/logs/workload.go @@ -205,3 +205,7 @@ func (w *WorkloadsLogsTarget) WorkloadStarted(workloadName string, report []*pod log.Error("Cannot start workload logs", err) } } + +func (w *WorkloadsLogsTarget) String() string { + return "workload logs" +} diff --git a/internal/metrics/remote_write.go b/internal/metrics/remote_write.go index bf088118..885d0357 100644 --- a/internal/metrics/remote_write.go +++ b/internal/metrics/remote_write.go @@ -532,3 +532,7 @@ func toTimeSeries(series []Series) (timeSeries []prompb.TimeSeries, lowest time. highest = fromDbTime(maxt) return } + +func (r *RemoteWrite) String() string { + return "remote write" +} diff --git a/internal/metrics/system.go b/internal/metrics/system.go index 5223db1e..e3d2e46d 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,7 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false) + nodeExporter, err := service.NewSystemd("node_exporter", nil, service.SystemBus) if err != nil { return nil, err } diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index fc213b8d..d1bb67df 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -48,11 +48,12 @@ func (wrkM *WorkloadMetrics) Update(config models.DeviceConfigurationMessage) er } func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) { - log.Infof("removing target metrics for workload '%v'", workloadName) + log.Infof("Removing target metrics for workload '%v'", workloadName) wrkM.daemon.DeleteTarget(workloadName) } func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podman.PodReport) { + log.Infof("Starting target metrics for workload '%s'", workloadName) for _, workload := range report { cfg := wrkM.getWorkload(workloadName) if cfg == nil { @@ -83,6 +84,10 @@ func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podm } } +func (wrKM *WorkloadMetrics) String() string { + return "workload metrics" +} + func getWorkloadUrls(report *podman.PodReport, config *models.Workload) []string { res := []string{} metricsPath := config.Metrics.Path diff --git a/internal/mount/mount.go b/internal/mount/mount.go index 6366141e..6bc11ac2 100644 --- a/internal/mount/mount.go +++ b/internal/mount/mount.go @@ -170,3 +170,7 @@ func getDefaultMountOptions() string { } return fmt.Sprintf("gid=%s,uid=%s", group.Gid, usr.Uid) } + +func (m *Manager) String() string { + return "mount" +} diff --git a/internal/os/os.go b/internal/os/os.go index a36373ae..7009d319 100644 --- a/internal/os/os.go +++ b/internal/os/os.go @@ -207,3 +207,7 @@ func (o *OS) updateGreenbootScripts() error { return nil } + +func (o *OS) String() string { + return "rpm-ostree" +} diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go new file mode 100644 index 00000000..92467833 --- /dev/null +++ b/internal/service/event_listener.go @@ -0,0 +1,174 @@ +package service + +import ( + "context" + "fmt" + "path/filepath" + "strconv" + "strings" + "sync" + + "github.com/coreos/go-systemd/v22/dbus" + "github.com/project-flotta/flotta-operator/models" + log "github.com/sirupsen/logrus" +) + +const ( + EventStarted EventType = "started" + EventStopped EventType = "stopped" + + // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent + // The only addition to the list is the "removed" case, which we use when a service is removed + // from the system and we receive an empty unitStatus object. + + //The service is running + running unitSubState = "running" + //The service is starting, probably due to a restart. + //In this case we have to notify the observers that the service is not yet up and is loading. A subsequent 'running' event will proceed. + start unitSubState = "start" + //The service has been stopped, due to the container being killed. Systemd will restart the service. + stop unitSubState = "stop" + //The service is dead, due to the container being stopped for instance. + dead unitSubState = "dead" + //The service failed to start + failed unitSubState = "failed" + //The service has been removed, no unit state is returned from systemd + removed unitSubState = "removed" +) + +type unitSubState string + +type EventType string + +type Event struct { + WorkloadName string + Type EventType +} + +type DBusEventListener struct { + eventCh chan *Event + set *dbus.SubscriptionSet + dbusCh <-chan map[string]*dbus.UnitStatus + dbusErrCh <-chan error + lock sync.Mutex +} + +func NewDBusEventListener() *DBusEventListener { + return &DBusEventListener{lock: sync.Mutex{}, eventCh: make(chan *Event, 1000)} +} + +func (e *DBusEventListener) Init(configuration models.DeviceConfigurationMessage) error { + log.Infof("Starting DBus event listener") + conn, err := newDbusConnection(UserBus) + if err != nil { + return fmt.Errorf("error while starting event listener: %v", err) + } + e.set = conn.NewSubscriptionSet() + for _, w := range configuration.Workloads { + s, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(w.Name), "UnitFileState") + if err != nil { + return fmt.Errorf("error while retrieving workload '%s' state: %v", w.Name, err) + } + log.Debugf("Unit UnitFileState property for workload '%s':%s", w.Name, s.Value.String()) + v, err := strconv.Unquote(s.Value.String()) + if err != nil { + return err + } + if v == "disabled" { + log.Warnf("Service for workload '%s' is disabled", w.Name) + } + e.add(DefaultServiceName(w.Name)) + } + go e.Listen() + return nil +} + +func (e *DBusEventListener) Update(configuration models.DeviceConfigurationMessage) error { + for _, wl := range configuration.Workloads { + svcName := DefaultServiceName(wl.Name) + if !e.contains(svcName) { + e.add(svcName) + } else { + return fmt.Errorf("unable to add systemd service '%s': there is service unit already being monitored with the same name", svcName) + } + } + return nil +} + +func (e *DBusEventListener) String() string { + return "DBus event listener" +} + +func (e *DBusEventListener) GetEventChannel() <-chan *Event { + return e.eventCh +} + +func (e *DBusEventListener) Listen() { + e.dbusCh, e.dbusErrCh = e.set.Subscribe() + for { + select { + case msg := <-e.dbusCh: + for name, unit := range msg { + log.Debugf("Captured DBus event for %s: %v+", name, unit) + n, err := extractWorkloadName(name) + if err != nil { + log.Error(err) + continue + } + state := translateUnitSubStatus(unit) + log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) + switch state { + case running: + log.Debugf("Sending start event to observer channel for workload %s", n) + e.eventCh <- &Event{WorkloadName: n, Type: EventStarted} + case removed: + log.Debugf("Service %s has been removed", name) + e.remove(name) + fallthrough + case stop, dead, failed, start: + log.Debugf("Sending stop event to observer channel for workload %s", n) + e.eventCh <- &Event{WorkloadName: n, Type: EventStopped} + default: + log.Debugf("Ignoring unit sub state for service %s: %s", name, unit.SubState) + } + } + case msg := <-e.dbusErrCh: + log.Errorf("Error while parsing dbus event: %v", msg) + } + } + +} + +func (e *DBusEventListener) add(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() + log.Debugf("Adding service for event listener %s", serviceName) + e.set.Add(serviceName) +} + +func (e *DBusEventListener) remove(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() + log.Debugf("Removing service from event listener %s", serviceName) + e.set.Remove(serviceName) +} + +func (e *DBusEventListener) contains(serviceName string) bool { + e.lock.Lock() + defer e.lock.Unlock() + return e.set.Contains(serviceName) +} + +func extractWorkloadName(serviceName string) (string, error) { + if filepath.Ext(serviceName) != ServiceSuffix { + return "", fmt.Errorf("invalid file name or not a service %s", serviceName) + } + return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil +} + +func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { + if unit == nil { + return removed + } + return unitSubState(unit.SubState) +} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index e73195f3..683df623 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -25,6 +25,13 @@ var ( DefaultUnitsPath = path.Join(os.Getenv("HOME"), ".config/systemd/user/") ) +type BusType string + +const ( + UserBus BusType = "user" + SystemBus BusType = "system" +) + //go:generate mockgen -package=service -destination=mock_systemd.go . Service type Service interface { GetName() string @@ -38,11 +45,10 @@ type Service interface { type systemd struct { Name string `json:"name"` - RestartSec int `json:"restartSec"` Units []string `json:"units"` UnitsContent map[string]string `json:"-"` dbusConnection *dbus.Conn `json:"-"` - Rootless bool `json:"rootless"` + BusType BusType `json:"busType"` } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -97,7 +103,6 @@ func (mgr *systemdManager) Add(svc Service) error { defer mgr.lock.Unlock() mgr.services[svc.GetName()] = svc - return mgr.write() } @@ -113,7 +118,6 @@ func (mgr *systemdManager) Remove(svc Service) error { defer mgr.lock.Unlock() delete(mgr.services, svc.GetName()) - return mgr.write() } @@ -122,19 +126,11 @@ func (mgr *systemdManager) write() error { if err != nil { return err } - err = os.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec - if err != nil { - return err - } - return nil + return os.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec } -func NewSystemd(name string, units map[string]string) (Service, error) { - return NewSystemdRootless(name, units, true) -} - -func newDbusConnection(rootless bool) (*dbus.Conn, error) { - if rootless { +func newDbusConnection(busType BusType) (*dbus.Conn, error) { + if busType == UserBus { return dbus.NewConnection(func() (*godbus.Conn, error) { uid := path.Base(os.Getenv("FLOTTA_XDG_RUNTIME_DIR")) path := filepath.Join(os.Getenv("FLOTTA_XDG_RUNTIME_DIR"), "systemd/private") @@ -161,11 +157,11 @@ func newDbusConnection(rootless bool) (*dbus.Conn, error) { } } -func NewSystemdRootless(name string, units map[string]string, rootless bool) (Service, error) { +func NewSystemd(name string, units map[string]string, busType BusType) (Service, error) { var err error var conn *dbus.Conn - conn, err = newDbusConnection(rootless) + conn, err = newDbusConnection(busType) if err != nil { return nil, err } @@ -177,10 +173,9 @@ func NewSystemdRootless(name string, units map[string]string, rootless bool) (Se return &systemd{ Name: name, - RestartSec: DefaultRestartTimeout, dbusConnection: conn, Units: unitNames, - Rootless: rootless, + BusType: busType, UnitsContent: units, }, nil } @@ -208,7 +203,6 @@ func (s *systemd) Remove() error { return err } } - return s.reload() } @@ -217,7 +211,7 @@ func (s *systemd) GetName() string { } func (s *systemd) reload() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -226,7 +220,8 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { - conn, err := newDbusConnection(s.Rootless) + log.Debugf("Starting systemd service %s", s.Name) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -246,7 +241,7 @@ func (s *systemd) Start() error { } func (s *systemd) Stop() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -266,7 +261,8 @@ func (s *systemd) Stop() error { } func (s *systemd) Enable() error { - conn, err := newDbusConnection(s.Rootless) + log.Debugf("Enabling systemd service %s", s.Name) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -277,7 +273,8 @@ func (s *systemd) Enable() error { } func (s *systemd) Disable() error { - conn, err := newDbusConnection(s.Rootless) + log.Debugf("Disabling systemd service %s", s.Name) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 2ccd81af..796f4305 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -39,8 +39,8 @@ type WorkloadManager struct { deviceId string } -func NewWorkloadManager(dataDir string, deviceId string) (*WorkloadManager, error) { - wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval) +func NewWorkloadManager(dataDir string, deviceId string, systemdEventCh <-chan *service.Event) (*WorkloadManager, error) { + wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval, systemdEventCh) if err != nil { return nil, err } @@ -146,7 +146,7 @@ func (w *WorkloadManager) Update(configuration models.DeviceConfigurationMessage if PodShouldWaitForMount(pod, configuration.Configuration) { errors = multierror.Append(errors, fmt.Errorf( - "Pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) + "pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) continue } @@ -414,6 +414,10 @@ func (w *WorkloadManager) deleteVolumeDir() error { return deleteDir(w.volumesDir) } +func (w *WorkloadManager) ListenServiceEvents() { + w.workloads.ListenServiceEvents() +} + func deleteDir(path string) error { err := os.RemoveAll(path) if err != nil { diff --git a/internal/workload/mock_wrapper.go b/internal/workload/mock_wrapper.go index 49ed1f81..b3a163ed 100644 --- a/internal/workload/mock_wrapper.go +++ b/internal/workload/mock_wrapper.go @@ -95,6 +95,18 @@ func (mr *MockWorkloadWrapperMockRecorder) ListSecrets() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSecrets", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListSecrets)) } +// ListenServiceEvents mocks base method. +func (m *MockWorkloadWrapper) ListenServiceEvents() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ListenServiceEvents") +} + +// ListenServiceEvents indicates an expected call of ListenServiceEvents. +func (mr *MockWorkloadWrapperMockRecorder) ListenServiceEvents() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListenServiceEvents", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListenServiceEvents)) +} + // Logs mocks base method. func (m *MockWorkloadWrapper) Logs(arg0 string, arg1 io.Writer) (context.CancelFunc, error) { m.ctrl.T.Helper() diff --git a/internal/workload/podman/mock_podman.go b/internal/workload/podman/mock_podman.go index 692072ce..70621aa9 100644 --- a/internal/workload/podman/mock_podman.go +++ b/internal/workload/podman/mock_podman.go @@ -82,19 +82,19 @@ func (mr *MockPodmanMockRecorder) GenerateSystemdService(arg0, arg1, arg2 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateSystemdService", reflect.TypeOf((*MockPodman)(nil).GenerateSystemdService), arg0, arg1, arg2) } -// GetPodReportForId mocks base method. -func (m *MockPodman) GetPodReportForId(arg0 string) (*PodReport, error) { +// GetPodReportForPodName mocks base method. +func (m *MockPodman) GetPodReportForPodName(arg0 string) (*PodReport, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPodReportForId", arg0) + ret := m.ctrl.Call(m, "GetPodReportForPodName", arg0) ret0, _ := ret[0].(*PodReport) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetPodReportForId indicates an expected call of GetPodReportForId. -func (mr *MockPodmanMockRecorder) GetPodReportForId(arg0 interface{}) *gomock.Call { +// GetPodReportForPodName indicates an expected call of GetPodReportForPodName. +func (mr *MockPodmanMockRecorder) GetPodReportForPodName(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForId", reflect.TypeOf((*MockPodman)(nil).GetPodReportForId), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForPodName", reflect.TypeOf((*MockPodman)(nil).GetPodReportForPodName), arg0) } // List mocks base method. diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index 0639fe93..99d93392 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -15,9 +15,8 @@ import ( "github.com/blang/semver" "github.com/go-openapi/swag" "github.com/project-flotta/flotta-device-worker/internal/service" + "github.com/project-flotta/flotta-device-worker/internal/workload/api" - log "github.com/sirupsen/logrus" - podmanEvents "github.com/containers/podman/v4/libpod/events" "github.com/containers/podman/v4/pkg/bindings" "github.com/containers/podman/v4/pkg/bindings/containers" "github.com/containers/podman/v4/pkg/bindings/generate" @@ -26,7 +25,7 @@ import ( "github.com/containers/podman/v4/pkg/bindings/secrets" "github.com/containers/podman/v4/pkg/bindings/system" "github.com/containers/podman/v4/pkg/domain/entities" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" ) @@ -36,10 +35,6 @@ const ( DefaultNetworkName = "podman" - podmanStart = string(podmanEvents.Start) - podmanRemove = string(podmanEvents.Remove) - podmanStop = string(podmanEvents.Stop) - podmanBinary = "/usr/bin/podman" autoUpdateServiceUnitTemplate = `[Unit] Description=Podman {{ .PodName }}.service @@ -72,7 +67,7 @@ type AutoUpdateUnit struct { //go:generate mockgen -package=podman -destination=mock_podman.go . Podman type Podman interface { - List() ([]api2.WorkloadInfo, error) + List() ([]api.WorkloadInfo, error) Remove(workloadId string) error Run(manifestPath, authFilePath string, annotations map[string]string) ([]*PodReport, error) Start(workloadId string) error @@ -84,7 +79,7 @@ type Podman interface { Exists(workloadId string) (bool, error) GenerateSystemdService(workload *v1.Pod, manifestPath string, monitoringInterval uint) (service.Service, error) Logs(podID string, res io.Writer) (context.CancelFunc, error) - GetPodReportForId(podID string) (*PodReport, error) + GetPodReportForPodName(podName string) (*PodReport, error) } type PodmanEvent struct { @@ -157,14 +152,14 @@ func (p *podman) MinVersion() error { return fmt.Errorf("podman version '%s' is not supported, needs >= v4.2", version.Version.Version) } -func (p *podman) List() ([]api2.WorkloadInfo, error) { +func (p *podman) List() ([]api.WorkloadInfo, error) { podList, err := pods.List(p.podmanConnection, nil) if err != nil { return nil, err } - var workloads []api2.WorkloadInfo + var workloads []api.WorkloadInfo for _, pod := range podList { - wi := api2.WorkloadInfo{ + wi := api.WorkloadInfo{ Id: pod.Id, Name: pod.Name, Status: pod.Status, @@ -216,73 +211,7 @@ func (p *podman) getContainerDetails(containerId string) (*ContainerReport, erro }, nil } -func (p *podman) Events(events chan *PodmanEvent) { - evchan := make(chan entities.Event, 1000) - cancel := make(chan bool) - booltrue := true - - workloads, err := p.List() - if err != nil { - log.Errorf("Cannot get the list of running pods: %v", err) - } - - for _, wrk := range workloads { - report, err := p.GetPodReportForId(wrk.Id) - if err != nil { - log.Errorf("Cannot get pod report for pod '%v', err: %v ", wrk.Name, err) - continue - } - event := &PodmanEvent{ - WorkloadName: wrk.Name, - Event: StartedContainer, - Report: report, - } - go func() { events <- event }() - } - - // subroutine that reads the event chan and sending proper messages to the - // wrapper - go func() { - for { - msg := <-evchan - event := &PodmanEvent{ - WorkloadName: msg.Actor.Attributes["name"], - } - switch msg.Action { - // create event is avoided because containers are not yet created and - // the flow is created->started - case podmanStart: - event.Event = StartedContainer - report, err := p.GetPodReportForId(msg.ID) - if err != nil { - log.Error("cannot get current pod information on event: ", err) - continue - } - event.Report = report - case podmanRemove, podmanStop: - event.Event = StoppedContainer - default: - continue - } - events <- event - } - }() - - // subroutine to track events - go func() { - err := system.Events(p.podmanConnection, evchan, cancel, &system.EventsOptions{ - Filters: map[string][]string{ - "type": {"pod"}, - }, - Stream: &booltrue, - }) - if err != nil { - log.Error("Cannot get podman events, err: ", err) - } - }() -} - -func (p *podman) GetPodReportForId(podID string) (*PodReport, error) { +func (p *podman) GetPodReportForPodName(podID string) (*PodReport, error) { podInfo, err := pods.Inspect(p.podmanConnection, podID, nil) if err != nil { return nil, err @@ -408,7 +337,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } - svc, err = service.NewSystemd(podName, report.Units) + svc, err = service.NewSystemd(podName, report.Units, service.UserBus) if err != nil { return nil, err } @@ -425,7 +354,7 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m return nil, err } units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units) + svc, err = service.NewSystemd(podName, units, service.UserBus) if err != nil { return nil, err } diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index b194098f..b4bd2357 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -9,12 +9,11 @@ import ( "github.com/project-flotta/flotta-device-worker/internal/service" - log "github.com/sirupsen/logrus" "github.com/project-flotta/flotta-device-worker/internal/workload/api" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" + log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" _ "github.com/golang/mock/mockgen/model" @@ -45,31 +44,33 @@ type WorkloadWrapper interface { RemoveSecret(string) error CreateSecret(string, string) error UpdateSecret(string, string) error + ListenServiceEvents() } // Workload manages the workload and its configuration on the device type Workload struct { - workloads podman.Podman + podManager podman.Podman netfilter network.Netfilter mappingRepository mapping.MappingRepository observers []Observer serviceManager service.SystemdManager monitoringInterval uint - events chan *podman.PodmanEvent lock sync.RWMutex + systemdEventCh <-chan *service.Event } -func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint) *Workload { +func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint, systemdEventCh <-chan *service.Event) *Workload { return &Workload{ - workloads: p, + podManager: p, netfilter: n, mappingRepository: m, serviceManager: s, monitoringInterval: monitoringInterval, + systemdEventCh: systemdEventCh, } } -func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, error) { +func newWorkloadInstance(configDir string, monitoringInterval uint, systemdEventCh <-chan *service.Event) (*Workload, error) { newPodman, err := podman.NewPodman() if err != nil { return nil, fmt.Errorf("workload cannot initialize podman manager: %w", err) @@ -89,37 +90,14 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, } ww := &Workload{ - workloads: newPodman, + podManager: newPodman, netfilter: netfilter, mappingRepository: mappingRepository, serviceManager: serviceManager, monitoringInterval: monitoringInterval, - events: make(chan *podman.PodmanEvent), - } - - newPodman.Events(ww.events) - - go func() { - for { - msg := <-ww.events - switch msg.Event { - case podman.StartedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() - for _, observer := range observers { - observer.WorkloadStarted(msg.WorkloadName, []*podman.PodReport{msg.Report}) - } - case podman.StoppedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() - for _, observer := range observers { - observer.WorkloadRemoved(msg.WorkloadName) - } - } - } - }() + systemdEventCh: systemdEventCh, + } + return ww, nil } @@ -131,7 +109,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemdRootless("podman-auto-update", nil, false) + svc, err := service.NewSystemd("podman-auto-update", nil, service.UserBus) if err != nil { return err } @@ -143,8 +121,8 @@ func (ww *Workload) Init() error { return ww.netfilter.AddTable(nfTableName) } -func (ww *Workload) List() ([]api2.WorkloadInfo, error) { - infos, err := ww.workloads.List() +func (ww *Workload) List() ([]api.WorkloadInfo, error) { + infos, err := ww.podManager.List() if err != nil { return nil, err } @@ -158,7 +136,7 @@ func (ww *Workload) List() ([]api2.WorkloadInfo, error) { } func (ww *Workload) Logs(podID string, res io.Writer) (context.CancelFunc, error) { - return ww.workloads.Logs(podID, res) + return ww.podManager.Logs(podID, res) } func (ww *Workload) Remove(workloadName string) error { @@ -172,7 +150,7 @@ func (ww *Workload) Remove(workloadName string) error { return err } - if err := ww.workloads.Remove(id); err != nil { + if err := ww.podManager.Remove(id); err != nil { return err } if err := ww.netfilter.DeleteChain(nfTableName, workloadName); err != nil { @@ -189,7 +167,7 @@ func (ww *Workload) Stop(workloadName string) error { if id == "" { id = workloadName } - err := ww.workloads.Stop(id) + err := ww.podManager.Stop(id) return err } @@ -224,7 +202,7 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri if err := ww.applyNetworkConfiguration(workload); err != nil { return err } - podIds, err := ww.workloads.Run(manifestPath, authFilePath, podmanAnnotations) + podIds, err := ww.podManager.Run(manifestPath, authFilePath, podmanAnnotations) if err != nil { return err } @@ -235,34 +213,21 @@ func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath stri } // Create the system service to manage the pod: - svc, err := ww.workloads.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) + svc, err := ww.podManager.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) if err != nil { return fmt.Errorf("error while generating systemd service: %v", err) } + log.Infof("Creating service for %s", workload.Name) err = ww.createService(svc) if err != nil { return fmt.Errorf("error while starting service: %v", err) } + log.Infof("Registering service %s", workload.Name) err = ww.serviceManager.Add(svc) if err != nil { return fmt.Errorf("error while updating service manager: %v", err) } - - podReport, err := ww.workloads.GetPodReportForId(workload.Name) - if err != nil { - return fmt.Errorf("error while sending started events: %v", err) - } - - // When pod started by systemd the event is not sent to the channel, so we send - // it manually here to notify workloadStarted observer. - go func() { - ww.events <- &podman.PodmanEvent{ - Event: podman.StartedContainer, - WorkloadName: workload.Name, - Report: podReport, - } - }() return nil } @@ -273,23 +238,25 @@ func (ww *Workload) removeService(workloadName string) error { } // Ignore stop failure: - _ = svc.Stop() + err := svc.Stop() + if err != nil { + return fmt.Errorf("unable to stop service %s:%s", workloadName, err) + } // Disable the service from the system: if err := svc.Disable(); err != nil { - return fmt.Errorf("Cannot disable systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to disable systemd service for '%s': %s", workloadName, err) } // Remove the service from the system: if err := svc.Remove(); err != nil { - return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to remove systemd service for '%s': %s", workloadName, err) } - err := ww.serviceManager.Remove(svc) + err = ww.serviceManager.Remove(svc) if err != nil { - return nil + log.Errorf("Unable to remove service from serviceManager %s:%s", workloadName, err) } - return nil } @@ -342,7 +309,7 @@ func (ww *Workload) Start(workload *v1.Pod) error { } podId := ww.mappingRepository.GetId(workload.Name) - if err := ww.workloads.Start(podId); err != nil { + if err := ww.podManager.Start(podId); err != nil { return err } return nil @@ -353,19 +320,19 @@ func (ww *Workload) PersistConfiguration() error { } func (ww *Workload) ListSecrets() (map[string]struct{}, error) { - return ww.workloads.ListSecrets() + return ww.podManager.ListSecrets() } func (ww *Workload) RemoveSecret(name string) error { - return ww.workloads.RemoveSecret(name) + return ww.podManager.RemoveSecret(name) } func (ww *Workload) CreateSecret(name, data string) error { - return ww.workloads.CreateSecret(name, data) + return ww.podManager.CreateSecret(name, data) } func (ww *Workload) UpdateSecret(name, data string) error { - return ww.workloads.UpdateSecret(name, data) + return ww.podManager.UpdateSecret(name, data) } func getHostPorts(workload *v1.Pod) ([]int32, error) { @@ -381,3 +348,35 @@ func getHostPorts(workload *v1.Pod) ([]int32, error) { } return hostPorts, nil } + +func (w *Workload) ListenServiceEvents() { + log.Debug("Starting routine to listen for service events") + for { + event := <-w.systemdEventCh + log.Debugf("Event received: %s", string(event.Type)) + w.lock.Lock() + observers := w.observers + w.lock.Unlock() + log.Debugf("Number of observers %d: %+v", len(observers), observers) + switch event.Type { + case service.EventStarted: + log.Infof("Service for workload %s started", event.WorkloadName) + report, err := w.podManager.GetPodReportForPodName(event.WorkloadName) + if err != nil { + log.Errorf("unable to get pod report for workload '%s':%v", event.WorkloadName, err) + } + for _, observer := range observers { + log.Debugf("Triggering WorkloadStarted in observer '%s' for workload '%s'", observer, event.WorkloadName) + observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) + } + case service.EventStopped: + log.Infof("Service for workload '%s' stopped", event.WorkloadName) + for _, observer := range observers { + log.Debugf("Triggering WorkloadRemoved in observer '%s' for workload '%s'", observer, event.WorkloadName) + observer.WorkloadRemoved(event.WorkloadName) + } + default: + log.Errorf("Unknown event %s", event.Type) + } + } +} diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index d22a896f..1cfa60e8 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -40,7 +40,7 @@ var _ = Describe("Workload management", func() { serviceManager = service.NewMockSystemdManager(mockCtrl) svc = service.NewMockService(mockCtrl) - wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15) + wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15, nil) }) AfterEach(func() { @@ -56,7 +56,6 @@ var _ = Describe("Workload management", func() { newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) - newPodman.EXPECT().GetPodReportForId("pod1").Return(nil, nil) svc.EXPECT().Add().Return(nil) svc.EXPECT().Enable().Return(nil) diff --git a/vendor/modules.txt b/vendor/modules.txt index 4c37c6b3..b7fd2703 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -390,6 +390,7 @@ github.com/docker/go-units # github.com/evanphx/json-patch v4.12.0+incompatible github.com/evanphx/json-patch # github.com/fsnotify/fsnotify v1.5.4 +## explicit github.com/fsnotify/fsnotify # github.com/gabriel-vasile/mimetype v1.4.0 github.com/gabriel-vasile/mimetype