diff --git a/cmd/device-worker/main.go b/cmd/device-worker/main.go index 94341533..1387dfa1 100644 --- a/cmd/device-worker/main.go +++ b/cmd/device-worker/main.go @@ -20,7 +20,8 @@ import ( os2 "github.com/project-flotta/flotta-device-worker/internal/os" registration2 "github.com/project-flotta/flotta-device-worker/internal/registration" "github.com/project-flotta/flotta-device-worker/internal/server" - workload2 "github.com/project-flotta/flotta-device-worker/internal/workload" + "github.com/project-flotta/flotta-device-worker/internal/service" + workload "github.com/project-flotta/flotta-device-worker/internal/workload" "net" "os" @@ -33,17 +34,21 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var yggdDispatchSocketAddr string -var gracefulRebootChannel chan struct{} - const ( - defaultDataDir = "/var/local/yggdrasil" + defaultDataDir = "/var/local/yggdrasil" + systemdUserServicesDir = ".config/systemd/user" +) + +var ( + yggdDispatchSocketAddr string + gracefulRebootChannel chan struct{} + systemdUserServicesFullPath = filepath.Join(os.Getenv("HOME"), systemdUserServicesDir) ) func initSystemdDirectory() error { - systemddir := filepath.Join(os.Getenv("HOME"), ".config/systemd/user/") + // init the flotta user systemd units directory - err := os.MkdirAll(systemddir, 0750) + err := os.MkdirAll(systemdUserServicesFullPath, 0750) if err != nil { return err } @@ -141,6 +146,12 @@ func main() { } configManager := configuration2.NewConfigurationManager(dataDir) + // Systemd event dbusEventListener + dbusEventListener := service.NewDBusEventListener() + // Start listening to systemd bus events. These events generated from here can start being processed once all observers have been initialized, otherwise + // we risk in missing observers whilst processing the events. + configManager.RegisterObserver(dbusEventListener) + // --- Client metrics configuration --- metricsStore, err := metrics.NewTSDB(dataDir) if err != nil { @@ -162,16 +173,16 @@ func main() { dataTransferWatcher := metrics.NewDataTransferMetrics(metricsDaemon) configManager.RegisterObserver(dataTransferWatcher) - wl, err := workload2.NewWorkloadManager(dataDir, deviceId) + workloadManager, err := workload.NewWorkloadManager(dataDir, deviceId, dbusEventListener.GetEventChannel()) if err != nil { log.Fatalf("cannot start Workload Manager. DeviceID: %s; err: %v", deviceId, err) } - logsWrapper := logs.NewWorkloadsLogsTarget(wl) + logsWrapper := logs.NewWorkloadsLogsTarget(workloadManager) configManager.RegisterObserver(logsWrapper) - wl.RegisterObserver(logsWrapper) - configManager.RegisterObserver(wl) - wl.RegisterObserver(workloadMetricWatcher) + workloadManager.RegisterObserver(logsWrapper) + configManager.RegisterObserver(workloadManager) + workloadManager.RegisterObserver(workloadMetricWatcher) remoteWrite := metrics.NewRemoteWrite(dataDir, deviceId, metricsStore) configManager.RegisterObserver(remoteWrite) @@ -189,25 +200,21 @@ func main() { } configManager.RegisterObserver(mountManager) - dataMonitor := datatransfer.NewMonitor(wl, configManager) - wl.RegisterObserver(dataMonitor) + dataMonitor := datatransfer.NewMonitor(workloadManager, configManager) + workloadManager.RegisterObserver(dataMonitor) configManager.RegisterObserver(dataMonitor) dataMonitor.Start() - if err != nil { - log.Fatalf("cannot start metrics store. DeviceID: %s; err: %v", deviceId, err) - } - - reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, wl) + reg, err := registration2.NewRegistration(deviceId, &hw, dispatcherClient, configManager, workloadManager) if err != nil { log.Fatalf("cannot start registration process: DeviceID: %s; err: %v", deviceId, err) } - hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, wl, &hw, dataMonitor, deviceOs, reg) + hbs := heartbeat2.NewHeartbeatService(dispatcherClient, configManager, workloadManager, &hw, dataMonitor, deviceOs, reg) configManager.RegisterObserver(hbs) reg.DeregisterLater( - wl, + workloadManager, configManager, hbs, dataMonitor, @@ -239,16 +246,19 @@ func main() { setupSignalHandler(metricsStore, ansibleManager) - go listenStartGracefulRebootChannel(wl, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, + go listenStartGracefulRebootChannel(workloadManager, dataMonitor, systemMetricsWatcher, metricsStore, hbs, ansibleManager, gracefulRebootChannel, deviceOs) + // All observers have been registered at this point. Ready to start listening to workload service events generated by changes to the systemd dbus + go workloadManager.ListenServiceEvents() + if err := s.Serve(l); err != nil { log.Fatalf("cannot start worker server, err: %v", err) } } -func listenStartGracefulRebootChannel(wl *workload2.WorkloadManager, dataMonitor *datatransfer.Monitor, +func listenStartGracefulRebootChannel(wl *workload.WorkloadManager, dataMonitor *datatransfer.Monitor, systemMetricsWatcher *metrics.SystemMetrics, metricsStore *metrics.TSDB, hbs *heartbeat2.Heartbeat, ansibleManager *ansible.Manager, gracefulRebootChannel chan struct{}, deviceOs *os2.OS) { // listen to the channel for getting StartGracefulReboot signal diff --git a/go.mod b/go.mod index 9d4b16ef..a77da14a 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/digitalocean/godo v1.80.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect + github.com/fsnotify/fsnotify v1.5.4 github.com/go-kit/log v0.2.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/runtime v0.23.1 // indirect diff --git a/internal/ansible/ansible_manager_test.go b/internal/ansible/ansible_manager_test.go index 2f6077db..9a6fd60a 100644 --- a/internal/ansible/ansible_manager_test.go +++ b/internal/ansible/ansible_manager_test.go @@ -164,8 +164,10 @@ var _ = Describe("Ansible Runner", func() { modTime1 := time.Now().Add(-3 * time.Hour) modTime2 := time.Now().Add(-2 * time.Hour) - p1Sha := ansibleManager.MappingRepository.GetSha256(p1) - p2Sha := ansibleManager.MappingRepository.GetSha256(p2) + p1Sha, err := ansibleManager.MappingRepository.GetSha256(p1) + Expect(err).ToNot(HaveOccurred()) + p2Sha, err := ansibleManager.MappingRepository.GetSha256(p2) + Expect(err).ToNot(HaveOccurred()) p1Path := path.Join(configDir, p1Sha) p2Path := path.Join(configDir, p2Sha) diff --git a/internal/ansible/mapping/mapping.go b/internal/ansible/mapping/mapping.go index 06901b59..d240527a 100644 --- a/internal/ansible/mapping/mapping.go +++ b/internal/ansible/mapping/mapping.go @@ -21,7 +21,7 @@ type mapping struct { //go:generate mockgen -package=mapping -destination=mock_mapping.go . MappingRepository type MappingRepository interface { - GetSha256(fileContent []byte) string + GetSha256(fileContent []byte) (string, error) Add(fileContent []byte, modTime time.Time) error Remove(fileContent []byte) error RemoveMappingFile() error @@ -86,17 +86,24 @@ func (m *mappingRepository) GetAll() map[int]string { return all } -func (m *mappingRepository) GetSha256(fileContent []byte) string { +func (m *mappingRepository) GetSha256(fileContent []byte) (string, error) { h := sha256.New() - h.Write(fileContent) - return fmt.Sprintf("%x", h.Sum(nil)) + _, err := h.Write(fileContent) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", h.Sum(nil)), nil } func (m *mappingRepository) Add(fileContent []byte, modTime time.Time) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) - err := os.WriteFile(filePath, []byte(fileContent), 0600) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) + err = os.WriteFile(filePath, []byte(fileContent), 0600) if err != nil { return err @@ -112,7 +119,11 @@ func (m *mappingRepository) Remove(fileContent []byte) error { m.lock.Lock() defer m.lock.Unlock() - filePath := path.Join(m.configDir, m.GetSha256(fileContent)) + sha, err := m.GetSha256(fileContent) + if err != nil { + return err + } + filePath := path.Join(m.configDir, sha) modTime := m.pathToModTime[filePath] delete(m.modTimeToPath, modTime) delete(m.pathToModTime, filePath) diff --git a/internal/ansible/mapping/mapping_test.go b/internal/ansible/mapping/mapping_test.go index ce1129a0..a95d7368 100644 --- a/internal/ansible/mapping/mapping_test.go +++ b/internal/ansible/mapping/mapping_test.go @@ -24,7 +24,8 @@ var _ = Describe("Mapping", func() { repo, err = mapping.NewMappingRepository(dir) Expect(err).ToNot(HaveOccurred()) - sha256Test = repo.GetSha256([]byte("test")) + sha256Test, err = repo.GetSha256([]byte("test")) + Expect(err).ToNot(HaveOccurred()) filePathTest = path.Join(configDir, sha256Test) }) AfterEach(func() { @@ -32,8 +33,10 @@ var _ = Describe("Mapping", func() { Expect(err).ToNot(HaveOccurred()) }) It("sha256 Generation", func() { - s1 := repo.GetSha256([]byte("AAA")) - s2 := repo.GetSha256([]byte("AAA")) + s1, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) + s2, err := repo.GetSha256([]byte("AAA")) + Expect(err).ToNot(HaveOccurred()) Expect(s1).To(Equal(s2)) }) It("Should be created empty", func() { @@ -87,12 +90,17 @@ var _ = Describe("Mapping", func() { It("Should persist mappings", func() { // given - filePath1 := path.Join(configDir, repo.GetSha256([]byte("test-one"))) - filePath2 := path.Join(configDir, repo.GetSha256([]byte("test-two"))) + sha, err := repo.GetSha256([]byte("test-one")) + Expect(err).ToNot(HaveOccurred()) + filePath1 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) + sha, err = repo.GetSha256([]byte("test-two")) + filePath2 := path.Join(configDir, sha) + Expect(err).ToNot(HaveOccurred()) modTime1 := time.Now() modTime2 := modTime1.Add(1 * time.Minute) - err := repo.Add([]byte("test-one"), modTime1) + err = repo.Add([]byte("test-one"), modTime1) Expect(err).ToNot(HaveOccurred()) Expect(repo.GetModTime(filePath1)).To(Equal(modTime1.UnixNano())) Expect(repo.GetFilePath(modTime1)).To(Equal(filePath1)) diff --git a/internal/ansible/mapping/mock_mapping.go b/internal/ansible/mapping/mock_mapping.go index fd3ead81..6a05f38b 100644 --- a/internal/ansible/mapping/mock_mapping.go +++ b/internal/ansible/mapping/mock_mapping.go @@ -105,11 +105,12 @@ func (mr *MockMappingRepositoryMockRecorder) GetModTime(arg0 interface{}) *gomoc } // GetSha256 mocks base method. -func (m *MockMappingRepository) GetSha256(arg0 []byte) string { +func (m *MockMappingRepository) GetSha256(arg0 []byte) (string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSha256", arg0) ret0, _ := ret[0].(string) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // GetSha256 indicates an expected call of GetSha256. diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index 2149113a..b4a6056a 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -35,6 +35,7 @@ var ( type Observer interface { Init(configuration models.DeviceConfigurationMessage) error Update(configuration models.DeviceConfigurationMessage) error + String() string } type Manager struct { @@ -49,10 +50,10 @@ type Manager struct { func NewConfigurationManager(dataDir string) *Manager { deviceConfigFile := path.Join(dataDir, "device-config.json") log.Infof("device config file: %s", deviceConfigFile) - file, err := ioutil.ReadFile(deviceConfigFile) //#nosec var deviceConfiguration models.DeviceConfigurationMessage initialConfig := atomic.Value{} initialConfig.Store(false) + file, err := ioutil.ReadFile(deviceConfigFile) //#nosec if err != nil { log.Error(err) deviceConfiguration = defaultDeviceConfigurationMessage @@ -147,6 +148,7 @@ func (m *Manager) Update(message models.DeviceConfigurationMessage) error { } log.Infof("updating configuration. New config: %s\nOld config: %s", newJson, oldJson) + log.Debugf("[ConfigManager] observers :%v+", m.observers) for _, observer := range m.observers { err := observer.Update(message) if err != nil { diff --git a/internal/configuration/configuration_mock.go b/internal/configuration/configuration_mock.go index 6bb849e9..987e371e 100644 --- a/internal/configuration/configuration_mock.go +++ b/internal/configuration/configuration_mock.go @@ -48,6 +48,20 @@ func (mr *MockObserverMockRecorder) Init(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockObserver)(nil).Init), arg0) } +// String mocks base method. +func (m *MockObserver) String() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "String") + ret0, _ := ret[0].(string) + return ret0 +} + +// String indicates an expected call of String. +func (mr *MockObserverMockRecorder) String() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "String", reflect.TypeOf((*MockObserver)(nil).String)) +} + // Update mocks base method. func (m *MockObserver) Update(arg0 models.DeviceConfigurationMessage) error { m.ctrl.T.Helper() diff --git a/internal/datatransfer/monitor_test.go b/internal/datatransfer/monitor_test.go index 2243302b..314936d6 100644 --- a/internal/datatransfer/monitor_test.go +++ b/internal/datatransfer/monitor_test.go @@ -37,7 +37,6 @@ var _ = Describe("Datatransfer", func() { wkwMock = workload.NewMockWorkloadWrapper(mockCtrl) wkwMock.EXPECT().Init().Return(nil).AnyTimes() - wkwMock.EXPECT().PersistConfiguration().AnyTimes() wkManager, err = workload.NewWorkloadManagerWithParams(datadir, wkwMock, "device-id-123") Expect(err).NotTo(HaveOccurred(), "Cannot start the Workload Manager") diff --git a/internal/heartbeat/heartbeat_test.go b/internal/heartbeat/heartbeat_test.go index 4fd482f7..70da1415 100644 --- a/internal/heartbeat/heartbeat_test.go +++ b/internal/heartbeat/heartbeat_test.go @@ -6,14 +6,12 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "reflect" "sync" "time" "git.sr.ht/~spc/go-log" - "github.com/openshift/assisted-installer-agent/src/util" "github.com/project-flotta/flotta-device-worker/internal/ansible" os2 "github.com/project-flotta/flotta-device-worker/internal/os" "github.com/project-flotta/flotta-device-worker/internal/registration" @@ -57,7 +55,6 @@ var _ = Describe("Heartbeat", func() { mockCtrl = gomock.NewController(GinkgoT()) wkwMock = workload.NewMockWorkloadWrapper(mockCtrl) wkwMock.EXPECT().Init().Return(nil).AnyTimes() - wkwMock.EXPECT().PersistConfiguration().AnyTimes() regMock := registration.NewMockRegistrationWrapper(mockCtrl) wkManager, err = workload.NewWorkloadManagerWithParams(datadir, wkwMock, "device-id-123") @@ -759,44 +756,6 @@ func (d *DispatcherFailing) GetConfig(ctx context.Context, in *pb.Empty, opts .. return nil, nil } -func NewFilledInterfaceMock(mtu int, name string, macAddr string, flags net.Flags, addrs []string, isPhysical bool, isBonding bool, isVlan bool, speedMbps int64) *util.MockInterface { - hwAddr, _ := net.ParseMAC(macAddr) - ret := util.MockInterface{} - ret.On("IsPhysical").Return(isPhysical) - if isPhysical || isBonding || isVlan { - ret.On("Name").Return(name) - ret.On("MTU").Return(mtu) - ret.On("HardwareAddr").Return(hwAddr) - ret.On("Flags").Return(flags) - ret.On("Addrs").Return(toAddresses(addrs), nil) - ret.On("SpeedMbps").Return(speedMbps) - } - if !isPhysical { - ret.On("IsBonding").Return(isBonding) - } - if !(isPhysical || isBonding) { - ret.On("IsVlan").Return(isVlan) - } - - return &ret -} - -func toAddresses(addrs []string) []net.Addr { - ret := make([]net.Addr, 0) - for _, a := range addrs { - ret = append(ret, str2Addr(a)) - } - return ret -} - -func str2Addr(addrStr string) net.Addr { - ip, ipnet, err := net.ParseCIDR(addrStr) - if err != nil { - return &net.IPNet{} - } - return &net.IPNet{IP: ip, Mask: ipnet.Mask} -} - func initHwMock(hwMock *hardware.MockHardware, configManager *configuration.Manager, hostname string, ipv4 []string) (*gomock.Call, *gomock.Call, *gomock.Call) { var m models.HardwareInfo configManager.GetDeviceConfiguration().Heartbeat.HardwareProfile.Scope = heartbeat.ScopeDelta diff --git a/internal/logs/workload.go b/internal/logs/workload.go index 7e4670d7..8e3b542a 100644 --- a/internal/logs/workload.go +++ b/internal/logs/workload.go @@ -205,3 +205,7 @@ func (w *WorkloadsLogsTarget) WorkloadStarted(workloadName string, report []*pod log.Error("Cannot start workload logs", err) } } + +func (w *WorkloadsLogsTarget) String() string { + return "workload logs" +} diff --git a/internal/metrics/remote_write.go b/internal/metrics/remote_write.go index fbffc9e5..877a0f97 100644 --- a/internal/metrics/remote_write.go +++ b/internal/metrics/remote_write.go @@ -533,3 +533,7 @@ func toTimeSeries(series []Series) (timeSeries []prompb.TimeSeries, lowest time. highest = fromDbTime(maxt) return } + +func (r *RemoteWrite) String() string { + return "remote write" +} diff --git a/internal/metrics/system.go b/internal/metrics/system.go index f86b3257..adb986ff 100644 --- a/internal/metrics/system.go +++ b/internal/metrics/system.go @@ -26,10 +26,7 @@ type SystemMetrics struct { } func NewSystemMetrics(daemon MetricsDaemon) (*SystemMetrics, error) { - nodeExporter, err := service.NewSystemdRootless("node_exporter", nil, false) - if err != nil { - return nil, err - } + nodeExporter := service.NewSystemd("node_exporter", nil, service.SystemBus) return NewSystemMetricsWithNodeExporter(daemon, nodeExporter), nil } diff --git a/internal/metrics/workload.go b/internal/metrics/workload.go index 287421f8..92af6506 100644 --- a/internal/metrics/workload.go +++ b/internal/metrics/workload.go @@ -48,11 +48,12 @@ func (wrkM *WorkloadMetrics) Update(config models.DeviceConfigurationMessage) er } func (wrkM *WorkloadMetrics) WorkloadRemoved(workloadName string) { - log.Infof("removing target metrics for workload '%v'", workloadName) + log.Infof("Removing target metrics for workload '%v'", workloadName) wrkM.daemon.DeleteTarget(workloadName) } func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podman.PodReport) { + log.Infof("Starting target metrics for workload '%s'", workloadName) for _, workload := range report { cfg := wrkM.getWorkload(workloadName) if cfg == nil { @@ -83,6 +84,10 @@ func (wrkM *WorkloadMetrics) WorkloadStarted(workloadName string, report []*podm } } +func (wrKM *WorkloadMetrics) String() string { + return "workload metrics" +} + func getWorkloadUrls(report *podman.PodReport, config *models.Workload) []string { res := []string{} metricsPath := config.Metrics.Path diff --git a/internal/mount/mount.go b/internal/mount/mount.go index ec156b47..6bf05b6e 100644 --- a/internal/mount/mount.go +++ b/internal/mount/mount.go @@ -170,3 +170,7 @@ func getDefaultMountOptions() string { } return fmt.Sprintf("gid=%s,uid=%s", group.Gid, usr.Uid) } + +func (m *Manager) String() string { + return "mount" +} diff --git a/internal/os/os.go b/internal/os/os.go index e19dbe7f..d87d3241 100644 --- a/internal/os/os.go +++ b/internal/os/os.go @@ -207,3 +207,7 @@ func (o *OS) updateGreenbootScripts() error { return nil } + +func (o *OS) String() string { + return "rpm-ostree" +} diff --git a/internal/service/event_listener.go b/internal/service/event_listener.go new file mode 100644 index 00000000..d5abe7e0 --- /dev/null +++ b/internal/service/event_listener.go @@ -0,0 +1,172 @@ +package service + +import ( + "context" + "fmt" + "path/filepath" + "strconv" + "strings" + "sync" + + "git.sr.ht/~spc/go-log" + "github.com/coreos/go-systemd/v22/dbus" + "github.com/project-flotta/flotta-operator/models" +) + +const ( + EventStarted EventType = "started" + EventStopped EventType = "stopped" + + // To avoid confusion, we match the action verb tense coming from systemd sub state, even though it is not consistent + // The only addition to the list is the "removed" case, which we use when a service is disabled by removing the softlink + // from the file system. In this case we receive an empty unitStatus object. + + //The service is running + running unitSubState = "running" + //The service is starting, probably due to a restart. + //In this case we have to notify the observers that the service is not yet up and is loading. A subsequent 'running' event will proceed. + start unitSubState = "start" + //The service has been stopped, due to the container being killed. Systemd will restart the service. + stop unitSubState = "stop" + //The service is dead, due to the container being stopped for instance. + dead unitSubState = "dead" + //The service failed to start + failed unitSubState = "failed" + //The service has been disabled, probably because the soft link in the `default.target.wants` directory has been removed. + // No unit state is returned from systemd and thus we stop watching for this service. + removed unitSubState = "removed" +) + +type unitSubState string + +type EventType string + +type Event struct { + WorkloadName string + Type EventType +} + +type DBusEventListener struct { + eventCh chan *Event + set *dbus.SubscriptionSet + dbusCh <-chan map[string]*dbus.UnitStatus + dbusErrCh <-chan error + lock sync.Mutex +} + +func NewDBusEventListener() *DBusEventListener { + return &DBusEventListener{lock: sync.Mutex{}, eventCh: make(chan *Event, 1000)} +} + +func (e *DBusEventListener) Init(configuration models.DeviceConfigurationMessage) error { + log.Infof("Starting DBus event listener") + conn, err := newDbusConnection(UserBus) + if err != nil { + return err + } + e.set = conn.NewSubscriptionSet() + for _, w := range configuration.Workloads { + s, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(w.Name), "UnitFileState") + if err != nil { + return err + } + log.Debugf("Unit UnitFileState property for workload %s:%s", w.Name, s.Value.String()) + v, err := strconv.Unquote(s.Value.String()) + if err != nil { + return err + } + if v == "disabled" { + log.Warnf("Service for workload %s is disabled", w.Name) + } + e.add(DefaultServiceName(w.Name)) + } + go e.Listen() + return nil +} + +func (e *DBusEventListener) Update(configuration models.DeviceConfigurationMessage) error { + for _, wl := range configuration.Workloads { + svcName := DefaultServiceName(wl.Name) + if !e.contains(svcName) { + e.add(svcName) + } + } + return nil +} + +func (e *DBusEventListener) String() string { + return "DBus event listener" +} + +func (e *DBusEventListener) GetEventChannel() <-chan *Event { + return e.eventCh +} + +func (e *DBusEventListener) Listen() { + e.dbusCh, e.dbusErrCh = e.set.Subscribe() + for { + select { + case msg := <-e.dbusCh: + for name, unit := range msg { + log.Tracef("Captured DBus event for %s: %v+", name, unit) + n, err := extractWorkloadName(name) + if err != nil { + log.Error(err) + continue + } + state := translateUnitSubStatus(unit) + log.Debugf("Systemd service for workload %s transitioned to sub state %s\n", n, state) + switch state { + case running: + log.Tracef("Sending start event to observer channel for workload %s", n) + e.eventCh <- &Event{WorkloadName: n, Type: EventStarted} + case removed: + log.Tracef("Service %s has been disabled.", name) + e.remove(name) + case stop, dead, failed, start: + log.Tracef("Sending stop event to observer channel for workload %s", n) + e.eventCh <- &Event{WorkloadName: n, Type: EventStopped} + default: + log.Tracef("Ignoring unit sub state for service %s: %s", name, state) + } + } + case msg := <-e.dbusErrCh: + log.Errorf("Error while parsing dbus event: %v", msg) + } + } + +} + +func (e *DBusEventListener) add(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() + log.Tracef("Adding service for event listener %s", serviceName) + e.set.Add(serviceName) +} + +func (e *DBusEventListener) remove(serviceName string) { + e.lock.Lock() + defer e.lock.Unlock() + log.Tracef("Removing service from event listener %s", serviceName) + e.set.Remove(serviceName) +} + +func (e *DBusEventListener) contains(serviceName string) bool { + e.lock.Lock() + defer e.lock.Unlock() + return e.set.Contains(serviceName) +} + +func extractWorkloadName(serviceName string) (string, error) { + if filepath.Ext(serviceName) != ServiceSuffix { + return "", fmt.Errorf("invalid file name or not a service %s", serviceName) + } + return strings.TrimSuffix(filepath.Base(serviceName), filepath.Ext(serviceName)), nil +} + +func translateUnitSubStatus(unit *dbus.UnitStatus) unitSubState { + if unit == nil { + return removed + } + return unitSubState(unit.SubState) +} diff --git a/internal/service/mock_systemd.go b/internal/service/mock_systemd.go index 32e87772..850abd10 100644 --- a/internal/service/mock_systemd.go +++ b/internal/service/mock_systemd.go @@ -103,6 +103,21 @@ func (mr *MockServiceMockRecorder) Remove() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockService)(nil).Remove)) } +// ServiceExists mocks base method. +func (m *MockService) ServiceExists() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ServiceExists") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ServiceExists indicates an expected call of ServiceExists. +func (mr *MockServiceMockRecorder) ServiceExists() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ServiceExists", reflect.TypeOf((*MockService)(nil).ServiceExists)) +} + // Start mocks base method. func (m *MockService) Start() error { m.ctrl.T.Helper() diff --git a/internal/service/mock_systemd_manager.go b/internal/service/mock_systemd_manager.go index 04bafb52..786e022c 100644 --- a/internal/service/mock_systemd_manager.go +++ b/internal/service/mock_systemd_manager.go @@ -74,17 +74,3 @@ func (mr *MockSystemdManagerMockRecorder) Remove(arg0 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockSystemdManager)(nil).Remove), arg0) } - -// RemoveServicesFile mocks base method. -func (m *MockSystemdManager) RemoveServicesFile() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveServicesFile") - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveServicesFile indicates an expected call of RemoveServicesFile. -func (mr *MockSystemdManagerMockRecorder) RemoveServicesFile() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveServicesFile", reflect.TypeOf((*MockSystemdManager)(nil).RemoveServicesFile)) -} diff --git a/internal/service/systemd.go b/internal/service/systemd.go index aa672f6e..7cb64ebe 100644 --- a/internal/service/systemd.go +++ b/internal/service/systemd.go @@ -2,13 +2,10 @@ package service import ( "context" - "encoding/json" "fmt" - "io/ioutil" "os" "path" "path/filepath" - "sync" "git.sr.ht/~spc/go-log" "github.com/coreos/go-systemd/v22/dbus" @@ -26,6 +23,13 @@ var ( DefaultUnitsPath = path.Join(os.Getenv("HOME"), ".config/systemd/user/") ) +type BusType string + +const ( + UserBus BusType = "user" + SystemBus BusType = "system" +) + //go:generate mockgen -package=service -destination=mock_systemd.go . Service type Service interface { GetName() string @@ -35,15 +39,14 @@ type Service interface { Stop() error Enable() error Disable() error + ServiceExists() (bool, error) } type systemd struct { - Name string `json:"name"` - RestartSec int `json:"restartSec"` - Units []string `json:"units"` - UnitsContent map[string]string `json:"-"` - dbusConnection *dbus.Conn `json:"-"` - Rootless bool `json:"rootless"` + name string + Units []string + UnitsContent map[string]string + BusType BusType } //go:generate mockgen -package=service -destination=mock_systemd_manager.go . SystemdManager @@ -51,91 +54,10 @@ type SystemdManager interface { Add(svc Service) error Get(name string) Service Remove(svc Service) error - RemoveServicesFile() error -} - -type systemdManager struct { - svcFilePath string - lock sync.RWMutex - services map[string]Service -} - -func NewSystemdManager(configDir string) (SystemdManager, error) { - services := make(map[string]*systemd) - servicePath := path.Join(configDir, "services.json") - servicesJson, err := ioutil.ReadFile(servicePath) //#nosec - if err == nil { - err := json.Unmarshal(servicesJson, &services) - if err != nil { - return nil, fmt.Errorf("cannot unmarshal %v: %w", servicePath, err) - } - } - - systemdSVC := make(map[string]Service) - for k, v := range services { - systemdSVC[k] = v - } - - return &systemdManager{svcFilePath: servicePath, services: systemdSVC, lock: sync.RWMutex{}}, nil -} - -func (mgr *systemdManager) RemoveServicesFile() error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - log.Infof("deleting %s file", mgr.svcFilePath) - err := os.RemoveAll(mgr.svcFilePath) - if err != nil { - log.Errorf("failed to delete %s: %v", mgr.svcFilePath, err) - return err - } - - return nil -} - -func (mgr *systemdManager) Add(svc Service) error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - mgr.services[svc.GetName()] = svc - - return mgr.write() -} - -func (mgr *systemdManager) Get(name string) Service { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - - return mgr.services[name] -} - -func (mgr *systemdManager) Remove(svc Service) error { - mgr.lock.Lock() - defer mgr.lock.Unlock() - - delete(mgr.services, svc.GetName()) - - return mgr.write() } -func (mgr *systemdManager) write() error { - svcJson, err := json.Marshal(mgr.services) - if err != nil { - return err - } - err = ioutil.WriteFile(mgr.svcFilePath, svcJson, 0640) //#nosec - if err != nil { - return err - } - return nil -} - -func NewSystemd(name string, units map[string]string) (Service, error) { - return NewSystemdRootless(name, units, true) -} - -func newDbusConnection(rootless bool) (*dbus.Conn, error) { - if rootless { +func newDbusConnection(busType BusType) (*dbus.Conn, error) { + if busType == UserBus { return dbus.NewConnection(func() (*godbus.Conn, error) { uid := path.Base(os.Getenv("FLOTTA_XDG_RUNTIME_DIR")) path := filepath.Join(os.Getenv("FLOTTA_XDG_RUNTIME_DIR"), "systemd/private") @@ -162,33 +84,25 @@ func newDbusConnection(rootless bool) (*dbus.Conn, error) { } } -func NewSystemdRootless(name string, units map[string]string, rootless bool) (Service, error) { - var err error - var conn *dbus.Conn - - conn, err = newDbusConnection(rootless) - if err != nil { - return nil, err - } - +func NewSystemd(name string, units map[string]string, busType BusType) Service { var unitNames []string for unit := range units { unitNames = append(unitNames, unit) } return &systemd{ - Name: name, - RestartSec: DefaultRestartTimeout, - dbusConnection: conn, - Units: unitNames, - Rootless: rootless, - UnitsContent: units, - }, nil + name: name, + + Units: unitNames, + BusType: busType, + UnitsContent: units, + } + } func (s *systemd) Add() error { if len(s.UnitsContent) == 0 { - log.Infof("calling systemd add service for '%s' with no units available", s.Name) + log.Infof("calling systemd add service for '%s' with no units available", s.GetName()) } for unit, content := range s.UnitsContent { @@ -203,22 +117,43 @@ func (s *systemd) Add() error { } func (s *systemd) Remove() error { - for _, unit := range s.Units { - err := os.Remove(path.Join(DefaultUnitsPath, DefaultServiceName(unit))) + + conn, err := newDbusConnection(s.BusType) + if err != nil { + return err + } + // Retrieve dependency/bound services to the workload. It will return a slice with the services that are bound to this one + p, err := conn.GetUnitPropertyContext(context.Background(), DefaultServiceName(s.GetName()), "BoundBy") + if err != nil { + return err + } + log.Debugf("List of dependent services to %s: %s", s.GetName(), p) + v, ok := p.Value.Value().([]string) + if !ok { + return fmt.Errorf("invalid value %s for property BoundBy in service %s", p.Value.Value(), s.GetName()) + } + // Delete the files that are bound to this service + for _, unit := range v { + log.Tracef("Deleting service unit configuration at %s", path.Join(DefaultUnitsPath, unit)) + err := os.Remove(path.Join(DefaultUnitsPath, unit)) if err != nil { return err } } - + // Finally, remove the service file + err = os.Remove(path.Join(DefaultUnitsPath, DefaultServiceName(s.GetName()))) + if err != nil { + return err + } return s.reload() } func (s *systemd) GetName() string { - return s.Name + return s.name } func (s *systemd) reload() error { - conn, err := newDbusConnection(s.Rootless) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } @@ -227,13 +162,14 @@ func (s *systemd) reload() error { } func (s *systemd) Start() error { - conn, err := newDbusConnection(s.Rootless) + log.Debugf("Starting service %s", s.GetName()) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() startChan := make(chan string) - if _, err := conn.StartUnitContext(context.Background(), DefaultServiceName(s.Name), "replace", startChan); err != nil { + if _, err := conn.StartUnitContext(context.Background(), DefaultServiceName(s.GetName()), "replace", startChan); err != nil { return err } @@ -242,18 +178,19 @@ func (s *systemd) Start() error { case "done": return nil default: - return errors.Errorf("Failed[%s] to start systemd service %s", result, DefaultServiceName(s.Name)) + return errors.Errorf("Failed[%s] to start systemd service %s", result, DefaultServiceName(s.GetName())) } } func (s *systemd) Stop() error { - conn, err := newDbusConnection(s.Rootless) + + conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() stopChan := make(chan string) - if _, err := conn.StopUnitContext(context.Background(), DefaultServiceName(s.Name), "replace", stopChan); err != nil { + if _, err := conn.StopUnitContext(context.Background(), DefaultServiceName(s.GetName()), "replace", stopChan); err != nil { return err } @@ -262,31 +199,52 @@ func (s *systemd) Stop() error { case "done": return nil default: - return errors.Errorf("Failed[%s] to stop systemd service %s", result, DefaultServiceName(s.Name)) + return errors.Errorf("Failed[%s] to stop systemd service %s", result, DefaultServiceName(s.GetName())) } } func (s *systemd) Enable() error { - conn, err := newDbusConnection(s.Rootless) + log.Debugf("Enabling service %s", s.GetName()) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() - _, _, err = conn.EnableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.Name)}, false, true) + _, _, err = conn.EnableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.GetName())}, false, true) return err } func (s *systemd) Disable() error { - conn, err := newDbusConnection(s.Rootless) + log.Debugf("Disabling service %s", s.GetName()) + conn, err := newDbusConnection(s.BusType) if err != nil { return err } defer conn.Close() - _, err = conn.DisableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.Name)}, false) + _, err = conn.DisableUnitFilesContext(context.Background(), []string{DefaultServiceName(s.GetName())}, false) return err } func DefaultServiceName(serviceName string) string { return serviceName + ServiceSuffix } + +func (s *systemd) ServiceExists() (bool, error) { + conn, err := newDbusConnection(s.BusType) + if err != nil { + return false, err + } + defer conn.Close() + + units, err := conn.ListUnitsByNamesContext(context.Background(), []string{DefaultServiceName(s.GetName())}) + if err != nil { + return false, err + } + log.Tracef("Number of matching units %d:%+v", len(units), units) + exists := len(units) == 1 && units[0].LoadState != "not-found" + if !exists { + log.Tracef("Service %s not found ", s.GetName()) + } + return exists, nil +} diff --git a/internal/workload/manager.go b/internal/workload/manager.go index 31e6a4bb..6766f81d 100644 --- a/internal/workload/manager.go +++ b/internal/workload/manager.go @@ -40,8 +40,8 @@ type WorkloadManager struct { deviceId string } -func NewWorkloadManager(dataDir string, deviceId string) (*WorkloadManager, error) { - wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval) +func NewWorkloadManager(dataDir string, deviceId string, systemdEventCh <-chan *service.Event) (*WorkloadManager, error) { + wrapper, err := newWorkloadInstance(dataDir, defaultWorkloadsMonitoringInterval, systemdEventCh) if err != nil { return nil, err } @@ -147,7 +147,7 @@ func (w *WorkloadManager) Update(configuration models.DeviceConfigurationMessage if PodShouldWaitForMount(pod, configuration.Configuration) { errors = multierror.Append(errors, fmt.Errorf( - "Pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) + "pod '%s' needs to mount blockdevice but it's not in there yet", workload.Name)) continue } @@ -353,18 +353,6 @@ func (w *WorkloadManager) Deregister() error { log.Errorf("failed to delete volumes directory. DeviceID: %s; err: %v", w.deviceId, err) } - err = w.removeMappingFile() - if err != nil { - errors = multierror.Append(errors, fmt.Errorf("failed to remove mapping file: %v", err)) - log.Errorf("failed to remove mapping file. DeviceID: %s; err: %v", w.deviceId, err) - } - - err = w.removeServicesFile() - if err != nil { - errors = multierror.Append(errors, fmt.Errorf("failed to remove services file: %v", err)) - log.Errorf("failed to remove services file. DeviceID: %s; err: %v", w.deviceId, err) - } - w.deregistered = true return errors } @@ -415,6 +403,10 @@ func (w *WorkloadManager) deleteVolumeDir() error { return deleteDir(w.volumesDir) } +func (w *WorkloadManager) ListenServiceEvents() { + w.workloads.ListenServiceEvents() +} + func deleteDir(path string) error { err := os.RemoveAll(path) if err != nil { @@ -445,27 +437,6 @@ func (w *WorkloadManager) deleteTable() error { return nil } -func (w *WorkloadManager) removeServicesFile() error { - log.Infof("deleting services file. DeviceID: %s;", w.deviceId) - err := w.workloads.RemoveServicesFile() - if err != nil { - return err - } - - return nil -} - -func (w *WorkloadManager) removeMappingFile() error { - log.Infof("deleting mapping file. DeviceID: %s;", w.deviceId) - err := w.workloads.RemoveMappingFile() - if err != nil { - log.Error(err) - return err - } - - return nil -} - func (w *WorkloadManager) toPod(workload *models.Workload) (*v1.Pod, error) { podSpec := v1.PodSpec{} err := yaml.Unmarshal([]byte(workload.Specification), &podSpec) diff --git a/internal/workload/manager_test.go b/internal/workload/manager_test.go index 221a612f..4a0ad300 100644 --- a/internal/workload/manager_test.go +++ b/internal/workload/manager_test.go @@ -92,16 +92,14 @@ var _ = Describe("Events", func() { wkwMock.EXPECT().List().Return([]api.WorkloadInfo{ {Id: "stale", Name: "stale", Status: "created"}, }, nil).AnyTimes() - wkwMock.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("Failed to start container")) - wkwMock.EXPECT().PersistConfiguration().AnyTimes() - wkwMock.EXPECT().Start(gomock.Any()).Return(fmt.Errorf("failed to start container")).AnyTimes() + wkwMock.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Failed to start container")) // when err := wkManager.Update(cfg) // then Expect(err).To(HaveOccurred()) - + Expect(err.Error()).To(ContainSubstring("Failed to start container")) // Check no events are generated: time.Sleep(5 * time.Second) events := wkManager.PopEvents() diff --git a/internal/workload/mock_wrapper.go b/internal/workload/mock_wrapper.go index 49ed1f81..7ea6cf48 100644 --- a/internal/workload/mock_wrapper.go +++ b/internal/workload/mock_wrapper.go @@ -95,6 +95,18 @@ func (mr *MockWorkloadWrapperMockRecorder) ListSecrets() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSecrets", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListSecrets)) } +// ListenServiceEvents mocks base method. +func (m *MockWorkloadWrapper) ListenServiceEvents() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ListenServiceEvents") +} + +// ListenServiceEvents indicates an expected call of ListenServiceEvents. +func (mr *MockWorkloadWrapperMockRecorder) ListenServiceEvents() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListenServiceEvents", reflect.TypeOf((*MockWorkloadWrapper)(nil).ListenServiceEvents)) +} + // Logs mocks base method. func (m *MockWorkloadWrapper) Logs(arg0 string, arg1 io.Writer) (context.CancelFunc, error) { m.ctrl.T.Helper() @@ -110,20 +122,6 @@ func (mr *MockWorkloadWrapperMockRecorder) Logs(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logs", reflect.TypeOf((*MockWorkloadWrapper)(nil).Logs), arg0, arg1) } -// PersistConfiguration mocks base method. -func (m *MockWorkloadWrapper) PersistConfiguration() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PersistConfiguration") - ret0, _ := ret[0].(error) - return ret0 -} - -// PersistConfiguration indicates an expected call of PersistConfiguration. -func (mr *MockWorkloadWrapperMockRecorder) PersistConfiguration() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConfiguration", reflect.TypeOf((*MockWorkloadWrapper)(nil).PersistConfiguration)) -} - // RegisterObserver mocks base method. func (m *MockWorkloadWrapper) RegisterObserver(arg0 Observer) { m.ctrl.T.Helper() @@ -150,20 +148,6 @@ func (mr *MockWorkloadWrapperMockRecorder) Remove(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockWorkloadWrapper)(nil).Remove), arg0) } -// RemoveMappingFile mocks base method. -func (m *MockWorkloadWrapper) RemoveMappingFile() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveMappingFile") - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveMappingFile indicates an expected call of RemoveMappingFile. -func (mr *MockWorkloadWrapperMockRecorder) RemoveMappingFile() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveMappingFile", reflect.TypeOf((*MockWorkloadWrapper)(nil).RemoveMappingFile)) -} - // RemoveSecret mocks base method. func (m *MockWorkloadWrapper) RemoveSecret(arg0 string) error { m.ctrl.T.Helper() @@ -178,20 +162,6 @@ func (mr *MockWorkloadWrapperMockRecorder) RemoveSecret(arg0 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveSecret", reflect.TypeOf((*MockWorkloadWrapper)(nil).RemoveSecret), arg0) } -// RemoveServicesFile mocks base method. -func (m *MockWorkloadWrapper) RemoveServicesFile() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveServicesFile") - ret0, _ := ret[0].(error) - return ret0 -} - -// RemoveServicesFile indicates an expected call of RemoveServicesFile. -func (mr *MockWorkloadWrapperMockRecorder) RemoveServicesFile() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveServicesFile", reflect.TypeOf((*MockWorkloadWrapper)(nil).RemoveServicesFile)) -} - // RemoveTable mocks base method. func (m *MockWorkloadWrapper) RemoveTable() error { m.ctrl.T.Helper() @@ -220,20 +190,6 @@ func (mr *MockWorkloadWrapperMockRecorder) Run(arg0, arg1, arg2, arg3 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockWorkloadWrapper)(nil).Run), arg0, arg1, arg2, arg3) } -// Start mocks base method. -func (m *MockWorkloadWrapper) Start(arg0 *v1.Pod) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Start", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Start indicates an expected call of Start. -func (mr *MockWorkloadWrapperMockRecorder) Start(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockWorkloadWrapper)(nil).Start), arg0) -} - // Stop mocks base method. func (m *MockWorkloadWrapper) Stop(arg0 string) error { m.ctrl.T.Helper() diff --git a/internal/workload/podman/mock_podman.go b/internal/workload/podman/mock_podman.go index 692072ce..70621aa9 100644 --- a/internal/workload/podman/mock_podman.go +++ b/internal/workload/podman/mock_podman.go @@ -82,19 +82,19 @@ func (mr *MockPodmanMockRecorder) GenerateSystemdService(arg0, arg1, arg2 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateSystemdService", reflect.TypeOf((*MockPodman)(nil).GenerateSystemdService), arg0, arg1, arg2) } -// GetPodReportForId mocks base method. -func (m *MockPodman) GetPodReportForId(arg0 string) (*PodReport, error) { +// GetPodReportForPodName mocks base method. +func (m *MockPodman) GetPodReportForPodName(arg0 string) (*PodReport, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPodReportForId", arg0) + ret := m.ctrl.Call(m, "GetPodReportForPodName", arg0) ret0, _ := ret[0].(*PodReport) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetPodReportForId indicates an expected call of GetPodReportForId. -func (mr *MockPodmanMockRecorder) GetPodReportForId(arg0 interface{}) *gomock.Call { +// GetPodReportForPodName indicates an expected call of GetPodReportForPodName. +func (mr *MockPodmanMockRecorder) GetPodReportForPodName(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForId", reflect.TypeOf((*MockPodman)(nil).GetPodReportForId), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodReportForPodName", reflect.TypeOf((*MockPodman)(nil).GetPodReportForPodName), arg0) } // List mocks base method. diff --git a/internal/workload/podman/podman.go b/internal/workload/podman/podman.go index bd1dc4a4..d40cda3c 100644 --- a/internal/workload/podman/podman.go +++ b/internal/workload/podman/podman.go @@ -15,9 +15,9 @@ import ( "github.com/blang/semver" "github.com/go-openapi/swag" "github.com/project-flotta/flotta-device-worker/internal/service" + api "github.com/project-flotta/flotta-device-worker/internal/workload/api" "git.sr.ht/~spc/go-log" - podmanEvents "github.com/containers/podman/v4/libpod/events" "github.com/containers/podman/v4/pkg/bindings" "github.com/containers/podman/v4/pkg/bindings/containers" "github.com/containers/podman/v4/pkg/bindings/generate" @@ -26,7 +26,6 @@ import ( "github.com/containers/podman/v4/pkg/bindings/secrets" "github.com/containers/podman/v4/pkg/bindings/system" "github.com/containers/podman/v4/pkg/domain/entities" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" v1 "k8s.io/api/core/v1" ) @@ -36,10 +35,6 @@ const ( DefaultNetworkName = "podman" - podmanStart = string(podmanEvents.Start) - podmanRemove = string(podmanEvents.Remove) - podmanStop = string(podmanEvents.Stop) - podmanBinary = "/usr/bin/podman" autoUpdateServiceUnitTemplate = `[Unit] Description=Podman {{ .PodName }}.service @@ -72,7 +67,7 @@ type AutoUpdateUnit struct { //go:generate mockgen -package=podman -destination=mock_podman.go . Podman type Podman interface { - List() ([]api2.WorkloadInfo, error) + List() ([]api.WorkloadInfo, error) Remove(workloadId string) error Run(manifestPath, authFilePath string, annotations map[string]string) ([]*PodReport, error) Start(workloadId string) error @@ -84,7 +79,7 @@ type Podman interface { Exists(workloadId string) (bool, error) GenerateSystemdService(workload *v1.Pod, manifestPath string, monitoringInterval uint) (service.Service, error) Logs(podID string, res io.Writer) (context.CancelFunc, error) - GetPodReportForId(podID string) (*PodReport, error) + GetPodReportForPodName(podName string) (*PodReport, error) } type PodmanEvent struct { @@ -157,14 +152,14 @@ func (p *podman) MinVersion() error { return fmt.Errorf("podman version '%s' is not supported, needs >= v4.2", version.Version.Version) } -func (p *podman) List() ([]api2.WorkloadInfo, error) { +func (p *podman) List() ([]api.WorkloadInfo, error) { podList, err := pods.List(p.podmanConnection, nil) if err != nil { return nil, err } - var workloads []api2.WorkloadInfo + var workloads []api.WorkloadInfo for _, pod := range podList { - wi := api2.WorkloadInfo{ + wi := api.WorkloadInfo{ Id: pod.Id, Name: pod.Name, Status: pod.Status, @@ -174,22 +169,22 @@ func (p *podman) List() ([]api2.WorkloadInfo, error) { return workloads, nil } -func (p *podman) Exists(workloadId string) (bool, error) { - exists, err := pods.Exists(p.podmanConnection, workloadId, nil) +func (p *podman) Exists(workloadName string) (bool, error) { + exists, err := pods.Exists(p.podmanConnection, workloadName, nil) if err != nil { return false, err } return exists, nil } -func (p *podman) Remove(workloadId string) error { - exists, err := p.Exists(workloadId) +func (p *podman) Remove(workloadName string) error { + exists, err := p.Exists(workloadName) if err != nil { return err } if exists { force := true - _, err := pods.Remove(p.podmanConnection, workloadId, &pods.RemoveOptions{Force: &force}) + _, err := pods.Remove(p.podmanConnection, workloadName, &pods.RemoveOptions{Force: &force}) if err != nil { return err } @@ -216,73 +211,7 @@ func (p *podman) getContainerDetails(containerId string) (*ContainerReport, erro }, nil } -func (p *podman) Events(events chan *PodmanEvent) { - evchan := make(chan entities.Event, 1000) - cancel := make(chan bool) - booltrue := true - - workloads, err := p.List() - if err != nil { - log.Errorf("Cannot get the list of running pods: %v", err) - } - - for _, wrk := range workloads { - report, err := p.GetPodReportForId(wrk.Id) - if err != nil { - log.Errorf("Cannot get pod report for pod '%v', err: %v ", wrk.Name, err) - continue - } - event := &PodmanEvent{ - WorkloadName: wrk.Name, - Event: StartedContainer, - Report: report, - } - go func() { events <- event }() - } - - // subroutine that reads the event chan and sending proper messages to the - // wrapper - go func() { - for { - msg := <-evchan - event := &PodmanEvent{ - WorkloadName: msg.Actor.Attributes["name"], - } - switch msg.Action { - // create event is avoided because containers are not yet created and - // the flow is created->started - case podmanStart: - event.Event = StartedContainer - report, err := p.GetPodReportForId(msg.ID) - if err != nil { - log.Error("cannot get current pod information on event: ", err) - continue - } - event.Report = report - case podmanRemove, podmanStop: - event.Event = StoppedContainer - default: - continue - } - events <- event - } - }() - - // subroutine to track events - go func() { - err := system.Events(p.podmanConnection, evchan, cancel, &system.EventsOptions{ - Filters: map[string][]string{ - "type": {"pod"}, - }, - Stream: &booltrue, - }) - if err != nil { - log.Error("Cannot get podman events, err: ", err) - } - }() -} - -func (p *podman) GetPodReportForId(podID string) (*PodReport, error) { +func (p *podman) GetPodReportForPodName(podID string) (*PodReport, error) { podInfo, err := pods.Inspect(p.podmanConnection, podID, nil) if err != nil { return nil, err @@ -343,12 +272,9 @@ func (p *podman) Run(manifestPath, authFilePath string, annotations map[string]s return podIds, nil } -func (p *podman) Start(workloadId string) error { - _, err := pods.Start(p.podmanConnection, workloadId, nil) - if err != nil { - return err - } - return nil +func (p *podman) Start(workloadName string) error { + _, err := pods.Start(p.podmanConnection, workloadName, nil) + return err } func (p *podman) ListSecrets() (map[string]struct{}, error) { @@ -391,7 +317,6 @@ func hasAutoUpdateEnabled(labels map[string]string) bool { } func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, monitoringInterval uint) (service.Service, error) { - var svc service.Service podName := workload.Name // Since podman don't support generation of the systemd services that re-creates the pod, instead of restarting it, @@ -407,31 +332,22 @@ func (p *podman) GenerateSystemdService(workload *v1.Pod, manifestPath string, m if err != nil { return nil, err } + return service.NewSystemd(podName, report.Units, service.UserBus), nil + } - svc, err = service.NewSystemd(podName, report.Units) - if err != nil { - return nil, err - } - } else { - var unit bytes.Buffer + var unit bytes.Buffer - tmp := template.New("unit") - t, err := tmp.Parse(autoUpdateServiceUnitTemplate) - if err != nil { - return nil, err - } - err = t.Execute(&unit, AutoUpdateUnit{ManifestPath: manifestPath, PodmanBinary: podmanBinary, PodName: podName}) - if err != nil { - return nil, err - } - units := map[string]string{podName: unit.String()} - svc, err = service.NewSystemd(podName, units) - if err != nil { - return nil, err - } + tmp := template.New("unit") + t, err := tmp.Parse(autoUpdateServiceUnitTemplate) + if err != nil { + return nil, err } - - return svc, nil + err = t.Execute(&unit, AutoUpdateUnit{ManifestPath: manifestPath, PodmanBinary: podmanBinary, PodName: podName}) + if err != nil { + return nil, err + } + units := map[string]string{podName: unit.String()} + return service.NewSystemd(podName, units, service.UserBus), nil } // Retrieve all pods logs and send that to the given io.Writer diff --git a/internal/workload/wrapper.go b/internal/workload/wrapper.go index 40a6f6cc..c4c0f913 100644 --- a/internal/workload/wrapper.go +++ b/internal/workload/wrapper.go @@ -11,8 +11,6 @@ import ( "git.sr.ht/~spc/go-log" "github.com/project-flotta/flotta-device-worker/internal/workload/api" - api2 "github.com/project-flotta/flotta-device-worker/internal/workload/api" - "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" v1 "k8s.io/api/core/v1" @@ -36,40 +34,34 @@ type WorkloadWrapper interface { Remove(string) error Stop(string) error Run(*v1.Pod, string, string, map[string]string) error - Start(*v1.Pod) error - PersistConfiguration() error RemoveTable() error - RemoveMappingFile() error - RemoveServicesFile() error ListSecrets() (map[string]struct{}, error) RemoveSecret(string) error CreateSecret(string, string) error UpdateSecret(string, string) error + ListenServiceEvents() } // Workload manages the workload and its configuration on the device type Workload struct { - workloads podman.Podman + podManager podman.Podman netfilter network.Netfilter - mappingRepository mapping.MappingRepository observers []Observer - serviceManager service.SystemdManager monitoringInterval uint - events chan *podman.PodmanEvent lock sync.RWMutex + systemdEventCh <-chan *service.Event } -func NewWorkload(p podman.Podman, n network.Netfilter, m mapping.MappingRepository, s service.SystemdManager, monitoringInterval uint) *Workload { +func NewWorkload(p podman.Podman, n network.Netfilter, monitoringInterval uint, systemdEventCh <-chan *service.Event) *Workload { return &Workload{ - workloads: p, + podManager: p, netfilter: n, - mappingRepository: m, - serviceManager: s, monitoringInterval: monitoringInterval, + systemdEventCh: systemdEventCh, } } -func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, error) { +func newWorkloadInstance(configDir string, monitoringInterval uint, systemdEventCh <-chan *service.Event) (*Workload, error) { newPodman, err := podman.NewPodman() if err != nil { return nil, fmt.Errorf("workload cannot initialize podman manager: %w", err) @@ -78,48 +70,14 @@ func newWorkloadInstance(configDir string, monitoringInterval uint) (*Workload, if err != nil { return nil, fmt.Errorf("workload cannot initialize netfilter manager: %w", err) } - mappingRepository, err := mapping.NewMappingRepository(configDir) - if err != nil { - return nil, fmt.Errorf("workload cannot initialize mapping repository: %w", err) - } - - serviceManager, err := service.NewSystemdManager(configDir) - if err != nil { - return nil, fmt.Errorf("workload cannot initialize systemd manager: %w", err) - } ww := &Workload{ - workloads: newPodman, + podManager: newPodman, netfilter: netfilter, - mappingRepository: mappingRepository, - serviceManager: serviceManager, monitoringInterval: monitoringInterval, - events: make(chan *podman.PodmanEvent), + systemdEventCh: systemdEventCh, } - newPodman.Events(ww.events) - - go func() { - for { - msg := <-ww.events - switch msg.Event { - case podman.StartedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() - for _, observer := range observers { - observer.WorkloadStarted(msg.WorkloadName, []*podman.PodReport{msg.Report}) - } - case podman.StoppedContainer: - ww.lock.Lock() - observers := ww.observers - ww.lock.Unlock() - for _, observer := range observers { - observer.WorkloadRemoved(msg.WorkloadName) - } - } - } - }() return ww, nil } @@ -131,10 +89,7 @@ func (ww *Workload) RegisterObserver(observer Observer) { func (ww *Workload) Init() error { // Enable auto-update podman timer: - svc, err := service.NewSystemdRootless("podman-auto-update", nil, false) - if err != nil { - return err - } + svc := service.NewSystemd("podman-auto-update", nil, service.UserBus) if err := svc.Start(); err != nil { return err } @@ -143,53 +98,35 @@ func (ww *Workload) Init() error { return ww.netfilter.AddTable(nfTableName) } -func (ww *Workload) List() ([]api2.WorkloadInfo, error) { - infos, err := ww.workloads.List() - if err != nil { - return nil, err - } - for i := range infos { - mappedName := ww.mappingRepository.GetName(infos[i].Id) - if mappedName != "" { - infos[i].Name = mappedName - } - } - return infos, err +func (ww *Workload) List() ([]api.WorkloadInfo, error) { + return ww.podManager.List() } func (ww *Workload) Logs(podID string, res io.Writer) (context.CancelFunc, error) { - return ww.workloads.Logs(podID, res) + return ww.podManager.Logs(podID, res) } func (ww *Workload) Remove(workloadName string) error { - id := ww.mappingRepository.GetId(workloadName) - if id == "" { - id = workloadName - } // Remove the service configuration from the system: if err := ww.removeService(workloadName); err != nil { return err } - if err := ww.workloads.Remove(id); err != nil { + if err := ww.podManager.Remove(workloadName); err != nil { return err } if err := ww.netfilter.DeleteChain(nfTableName, workloadName); err != nil { log.Errorf("failed to delete chain '%[1]s' from table '%[2]s' for workload '%[1]s': %[3]v", workloadName, nfTableName, err) } - if err := ww.mappingRepository.Remove(workloadName); err != nil { - return err - } + return nil } func (ww *Workload) Stop(workloadName string) error { - id := ww.mappingRepository.GetId(workloadName) - if id == "" { - id = workloadName - } - err := ww.workloads.Stop(id) + id := workloadName + + err := ww.podManager.Stop(id) return err } @@ -202,110 +139,60 @@ func (ww *Workload) RemoveTable() error { return nil } -func (ww *Workload) RemoveServicesFile() error { - log.Infof("deleting services file") - if err := ww.serviceManager.RemoveServicesFile(); err != nil { - log.Errorf("failed to remove services file: %v", err) - return err - } - return nil -} - -func (ww *Workload) RemoveMappingFile() error { - log.Infof("deleting mapping file") - if err := ww.mappingRepository.RemoveMappingFile(); err != nil { - log.Errorf("failed to remove mapping file: %v", err) - return err - } - return nil -} - func (ww *Workload) Run(workload *v1.Pod, manifestPath string, authFilePath string, podmanAnnotations map[string]string) error { if err := ww.applyNetworkConfiguration(workload); err != nil { return err } - podIds, err := ww.workloads.Run(manifestPath, authFilePath, podmanAnnotations) + _, err := ww.podManager.Run(manifestPath, authFilePath, podmanAnnotations) if err != nil { return err } - // Must be called before GenerateSystemdService: - if err = ww.mappingRepository.Add(workload.Name, podIds[0].Id); err != nil { - return err - } - // Create the system service to manage the pod: - svc, err := ww.workloads.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) + svc, err := ww.podManager.GenerateSystemdService(workload, manifestPath, ww.monitoringInterval) if err != nil { return fmt.Errorf("error while generating systemd service: %v", err) } - err = ww.createService(svc) - if err != nil { - return fmt.Errorf("error while starting service: %v", err) + log.Infof("Starting service for %s", workload.Name) + if err := svc.Add(); err != nil { + return fmt.Errorf("cannot add systemd service '%s': %v", svc.GetName(), err) } - err = ww.serviceManager.Add(svc) - if err != nil { - return fmt.Errorf("error while updating service manager: %v", err) + if err := svc.Enable(); err != nil { + return fmt.Errorf("cannot enable systemd service '%s': %v", svc.GetName(), err) } - podReport, err := ww.workloads.GetPodReportForId(workload.Name) + err = svc.Start() if err != nil { - return fmt.Errorf("error while sending started events: %v", err) + log.Errorf("cannot start systemd service '%s': %v", svc.GetName(), err) + return err } - // When pod started by systemd the event is not sent to the channel, so we send - // it manually here to notify workloadStarted observer. - go func() { - ww.events <- &podman.PodmanEvent{ - Event: podman.StartedContainer, - WorkloadName: workload.Name, - Report: podReport, - } - }() return nil } func (ww *Workload) removeService(workloadName string) error { - svc := ww.serviceManager.Get(workloadName) - if svc == nil { + svc := service.NewSystemd(workloadName, nil, service.UserBus) + exists, err := svc.ServiceExists() + if err != nil { + return err + } + if !exists { return nil } - - // Ignore stop failure: - _ = svc.Stop() + err = svc.Stop() + if err != nil { + return fmt.Errorf("unable to stop service %s:%s", workloadName, err) + } // Disable the service from the system: if err := svc.Disable(); err != nil { - return fmt.Errorf("Cannot disable systemd service for '%s': %s", workloadName, err) + return fmt.Errorf("unable to disable systemd service for '%s': %+v", workloadName, err) } // Remove the service from the system: if err := svc.Remove(); err != nil { - return fmt.Errorf("Cannot remove systemd service for '%s': %s", workloadName, err) - } - - err := ww.serviceManager.Remove(svc) - if err != nil { - return nil - } - - return nil -} - -func (ww *Workload) createService(svc service.Service) error { - if err := svc.Add(); err != nil { - return fmt.Errorf("cannot add systemd service '%s': %v", svc.GetName(), err) - } - if err := svc.Enable(); err != nil { - return fmt.Errorf("cannot enable systemd service '%s': %v", svc.GetName(), err) - } - - err := svc.Start() - if err != nil { - // A service maybe is already in place, and started, so no returning an - // error here. - log.Errorf("cannot start systemd service '%s': %v", svc.GetName(), err) + return fmt.Errorf("unable to remove systemd service for '%s': %s", workloadName, err) } return nil @@ -336,36 +223,30 @@ func (ww *Workload) applyNetworkConfiguration(workload *v1.Pod) error { } func (ww *Workload) Start(workload *v1.Pod) error { - _ = ww.netfilter.DeleteChain(nfTableName, workload.Name) - if err := ww.applyNetworkConfiguration(workload); err != nil { - return err + err := ww.netfilter.DeleteChain(nfTableName, workload.Name) + if err != nil { + log.Errorf("Error detected while deleting chain for workload %s:%s", workload.Name, err) } - - podId := ww.mappingRepository.GetId(workload.Name) - if err := ww.workloads.Start(podId); err != nil { + if err = ww.applyNetworkConfiguration(workload); err != nil { return err } - return nil -} - -func (ww *Workload) PersistConfiguration() error { - return ww.mappingRepository.Persist() + return ww.podManager.Start(workload.Name) } func (ww *Workload) ListSecrets() (map[string]struct{}, error) { - return ww.workloads.ListSecrets() + return ww.podManager.ListSecrets() } func (ww *Workload) RemoveSecret(name string) error { - return ww.workloads.RemoveSecret(name) + return ww.podManager.RemoveSecret(name) } func (ww *Workload) CreateSecret(name, data string) error { - return ww.workloads.CreateSecret(name, data) + return ww.podManager.CreateSecret(name, data) } func (ww *Workload) UpdateSecret(name, data string) error { - return ww.workloads.UpdateSecret(name, data) + return ww.podManager.UpdateSecret(name, data) } func getHostPorts(workload *v1.Pod) ([]int32, error) { @@ -381,3 +262,35 @@ func getHostPorts(workload *v1.Pod) ([]int32, error) { } return hostPorts, nil } + +func (w *Workload) ListenServiceEvents() { + log.Debug("Starting routine to listen for service events") + for { + event := <-w.systemdEventCh + log.Debugf("Event received: %s", string(event.Type)) + w.lock.Lock() + observers := w.observers + w.lock.Unlock() + log.Debugf("Number of observers %d: %+v", len(observers), observers) + switch event.Type { + case service.EventStarted: + log.Infof("Service for workload %s started", event.WorkloadName) + report, err := w.podManager.GetPodReportForPodName(event.WorkloadName) + if err != nil { + log.Errorf("unable to get pod report for workload %s:%v", event.WorkloadName, err) + } + for _, observer := range observers { + log.Debugf("Triggering WorkloadStarted in observer '%s' for workload %s", observer, event.WorkloadName) + observer.WorkloadStarted(event.WorkloadName, []*podman.PodReport{report}) + } + case service.EventStopped: + log.Infof("Service for workload %s stopped", event.WorkloadName) + for _, observer := range observers { + log.Debugf("Triggering WorkloadRemoved in observer '%s' for workload %s", observer, event.WorkloadName) + observer.WorkloadRemoved(event.WorkloadName) + } + default: + log.Errorf("Unknown event %s", event.Type) + } + } +} diff --git a/internal/workload/wrapper_test.go b/internal/workload/wrapper_test.go index d22a896f..e5f63d06 100644 --- a/internal/workload/wrapper_test.go +++ b/internal/workload/wrapper_test.go @@ -8,7 +8,6 @@ import ( . "github.com/onsi/gomega" "github.com/project-flotta/flotta-device-worker/internal/service" "github.com/project-flotta/flotta-device-worker/internal/workload" - "github.com/project-flotta/flotta-device-worker/internal/workload/mapping" "github.com/project-flotta/flotta-device-worker/internal/workload/network" "github.com/project-flotta/flotta-device-worker/internal/workload/podman" v1 "k8s.io/api/core/v1" @@ -23,24 +22,21 @@ const ( var _ = Describe("Workload management", func() { var ( - mockCtrl *gomock.Controller - wk *workload.Workload - newPodman *podman.MockPodman - netfilter *network.MockNetfilter - mappingRepository *mapping.MockMappingRepository - serviceManager *service.MockSystemdManager - svc *service.MockService + mockCtrl *gomock.Controller + wk *workload.Workload + newPodman *podman.MockPodman + netfilter *network.MockNetfilter + + svc *service.MockService ) BeforeEach(func() { mockCtrl = gomock.NewController(GinkgoT()) newPodman = podman.NewMockPodman(mockCtrl) netfilter = network.NewMockNetfilter(mockCtrl) - mappingRepository = mapping.NewMockMappingRepository(mockCtrl) - serviceManager = service.NewMockSystemdManager(mockCtrl) svc = service.NewMockService(mockCtrl) - wk = workload.NewWorkload(newPodman, netfilter, mappingRepository, serviceManager, 15) + wk = workload.NewWorkload(newPodman, netfilter, 15, nil) }) AfterEach(func() { @@ -56,16 +52,11 @@ var _ = Describe("Workload management", func() { newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) - newPodman.EXPECT().GetPodReportForId("pod1").Return(nil, nil) svc.EXPECT().Add().Return(nil) svc.EXPECT().Enable().Return(nil) svc.EXPECT().Start().Return(nil) - serviceManager.EXPECT().Add(svc).Return(nil) - - mappingRepository.EXPECT().Add("pod1", "id1") - // when err := wk.Run(pod, manifestPath, authFilePath, nil) @@ -78,8 +69,6 @@ var _ = Describe("Workload management", func() { // given pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}} - mappingRepository.EXPECT().Add("pod1", "id1") - newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) @@ -98,8 +87,6 @@ var _ = Describe("Workload management", func() { // given pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}} - mappingRepository.EXPECT().Add("pod1", "id1") - newPodman.EXPECT().Run(manifestPath, authFilePath, nil).Return([]*podman.PodReport{{Id: "id1"}}, nil) newPodman.EXPECT().GenerateSystemdService(pod, gomock.Any(), gomock.Any()).Return(svc, nil) diff --git a/vendor/modules.txt b/vendor/modules.txt index 3a743d42..f59f968a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -393,6 +393,7 @@ github.com/docker/go-units # github.com/evanphx/json-patch v4.12.0+incompatible github.com/evanphx/json-patch # github.com/fsnotify/fsnotify v1.5.4 +## explicit github.com/fsnotify/fsnotify # github.com/gabriel-vasile/mimetype v1.4.0 github.com/gabriel-vasile/mimetype