diff --git a/dbus/set.go b/dbus/set.go index 17c5d485..c0b8fde1 100644 --- a/dbus/set.go +++ b/dbus/set.go @@ -14,28 +14,43 @@ package dbus +import ( + "sync" +) + type set struct { data map[string]bool + mu sync.Mutex } func (s *set) Add(value string) { + s.mu.Lock() + defer s.mu.Unlock() s.data[value] = true } func (s *set) Remove(value string) { + s.mu.Lock() + defer s.mu.Unlock() delete(s.data, value) } func (s *set) Contains(value string) (exists bool) { + s.mu.Lock() + defer s.mu.Unlock() _, exists = s.data[value] return } func (s *set) Length() int { + s.mu.Lock() + defer s.mu.Unlock() return len(s.data) } func (s *set) Values() (values []string) { + s.mu.Lock() + defer s.mu.Unlock() for val := range s.data { values = append(values, val) } @@ -43,5 +58,5 @@ func (s *set) Values() (values []string) { } func newSet() *set { - return &set{make(map[string]bool)} + return &set{data: make(map[string]bool)} } diff --git a/dbus/subscription.go b/dbus/subscription.go index f0f6aad9..7aff411d 100644 --- a/dbus/subscription.go +++ b/dbus/subscription.go @@ -15,6 +15,7 @@ package dbus import ( + "context" "errors" "log" "time" @@ -94,16 +95,26 @@ func (c *Conn) dispatch() { }() } -// SubscribeUnits returns two unbuffered channels which will receive all changed units every -// interval. Deleted units are sent as nil. +// Deprecated: use SubscribeUnitsContext instead. func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { - return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) + return c.SubscribeUnitsContext(context.Background(), interval) +} + +// SubscribeUnitsContext returns two unbuffered channels which will receive all changed units every +// interval. Deleted units are sent as nil. +func (c *Conn) SubscribeUnitsContext(ctx context.Context, interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { + return c.SubscribeUnitsCustomContext(ctx, interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) } -// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer +// Deprecated: use SubscribeUnitsCustomContext instead. +func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { + return c.SubscribeUnitsCustomContext(context.Background(), interval, buffer, isChanged, filterUnit) +} + +// SubscribeUnitsCustomContext is like SubscribeUnits but lets you specify the buffer // size of the channels, the comparison function for detecting changes and a filter // function for cutting down on the noise that your channel receives. -func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { +func (c *Conn) SubscribeUnitsCustomContext(ctx context.Context, interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { old := make(map[string]*UnitStatus) statusChan := make(chan map[string]*UnitStatus, buffer) errChan := make(chan error, buffer) @@ -112,7 +123,7 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange for { timerChan := time.After(interval) - units, err := c.ListUnits() + units, err := c.ListUnitsContext(ctx) if err == nil { cur := make(map[string]*UnitStatus) for i := range units { @@ -145,7 +156,14 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange errChan <- err } - <-timerChan + select { + case <-timerChan: + continue + case <-ctx.Done(): + close(statusChan) + close(errChan) + return + } } }() diff --git a/dbus/subscription_set.go b/dbus/subscription_set.go index dbe4aa88..173ca372 100644 --- a/dbus/subscription_set.go +++ b/dbus/subscription_set.go @@ -15,6 +15,7 @@ package dbus import ( + "context" "time" ) @@ -29,16 +30,21 @@ func (s *SubscriptionSet) filter(unit string) bool { return !s.Contains(unit) } -// Subscribe starts listening for dbus events for all of the units in the set. +// SubscribeContext starts listening for dbus events for all of the units in the set. // Returns channels identical to conn.SubscribeUnits. -func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) { +func (s *SubscriptionSet) SubscribeContext(ctx context.Context) (<-chan map[string]*UnitStatus, <-chan error) { // TODO: Make fully evented by using systemd 209 with properties changed values - return s.conn.SubscribeUnitsCustom(time.Second, 0, + return s.conn.SubscribeUnitsCustomContext(ctx, time.Second, 0, mismatchUnitStatus, func(unit string) bool { return s.filter(unit) }, ) } +// Deprecated: use SubscribeContext instead. +func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) { + return s.SubscribeContext(context.Background()) +} + // NewSubscriptionSet returns a new subscription set. func (c *Conn) NewSubscriptionSet() *SubscriptionSet { return &SubscriptionSet{newSet(), c}