Skip to content

Commit 23e5cf6

Browse files
committed
Improve status handler, tracing, and context handling
1 parent fdbfe8b commit 23e5cf6

File tree

11 files changed

+254
-56
lines changed

11 files changed

+254
-56
lines changed

cmd/app/app.go

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/grafana/dskit/services"
1717
"github.com/grafana/dskit/signals"
1818
"github.com/pkg/errors"
19+
"gopkg.in/yaml.v2"
1920
"sigs.k8s.io/controller-runtime/pkg/client"
2021

2122
"github.com/jedib0t/go-pretty/v6/table"
@@ -191,13 +192,18 @@ func (a *App) statusHandler() http.HandlerFunc {
191192
"version": a.writeStatusVersion,
192193
"endpoints": a.writeStatusEndpoints,
193194
"services": a.writeStatusServices,
194-
"conditioner": a.writeStatusConditioner,
195195
"mqttclient": a.writeStatusMqttClient,
196+
"conditioner": a.writeStatusConditioner,
196197
}
197198

198199
wrapStatus := func(endpoint string) {
199200
msg.WriteString("GET /status/" + endpoint + "\n")
200201
switch endpoint {
202+
case "config":
203+
err := a.writeStatusConfig(&msg, r)
204+
if err != nil {
205+
errs = append(errs, err)
206+
}
201207
default:
202208
err := simpleEndpoints[endpoint](&msg)
203209
if err != nil {
@@ -210,11 +216,19 @@ func (a *App) statusHandler() http.HandlerFunc {
210216
if endpoint, ok := vars["endpoint"]; ok {
211217
wrapStatus(endpoint)
212218
} else {
213-
wrapStatus("version")
214-
wrapStatus("services")
215-
wrapStatus("endpoints")
216-
wrapStatus("conditioner")
217-
wrapStatus("mqttclient")
219+
sortedEndpoints := []string{"version", "endpoints", "services", "config"}
220+
221+
if a.mqttclient != nil {
222+
sortedEndpoints = append(sortedEndpoints, "mqttclient")
223+
}
224+
225+
if a.conditioner != nil {
226+
sortedEndpoints = append(sortedEndpoints, "conditioner")
227+
}
228+
229+
for e := range sortedEndpoints {
230+
wrapStatus(sortedEndpoints[e])
231+
}
218232
}
219233

220234
w.Header().Set("Content-Type", "text/plain")
@@ -352,6 +366,54 @@ func (a *App) writeStatusMqttClient(w io.Writer) error {
352366
return nil
353367
}
354368

369+
func (a *App) writeStatusConfig(w io.Writer, r *http.Request) error {
370+
var output any
371+
372+
mode := r.URL.Query().Get("mode")
373+
switch mode {
374+
case "diff":
375+
defaultCfg := NewDefaultConfig()
376+
377+
defaultCfgYaml, err := yamlMarshalUnmarshal(defaultCfg)
378+
if err != nil {
379+
return err
380+
}
381+
382+
cfgYaml, err := yamlMarshalUnmarshal(a.cfg)
383+
if err != nil {
384+
return err
385+
}
386+
387+
output, err = diffConfig(defaultCfgYaml, cfgYaml)
388+
if err != nil {
389+
return err
390+
}
391+
case "defaults":
392+
output = NewDefaultConfig()
393+
case "":
394+
output = a.cfg
395+
default:
396+
return fmt.Errorf("unknown value for mode query parameter: %v", mode)
397+
}
398+
399+
out, err := yaml.Marshal(output)
400+
if err != nil {
401+
return err
402+
}
403+
404+
_, err = w.Write([]byte("---\n"))
405+
if err != nil {
406+
return err
407+
}
408+
409+
_, err = w.Write(out)
410+
if err != nil {
411+
return err
412+
}
413+
414+
return nil
415+
}
416+
355417
func (a *App) writeStatusConditioner(w io.Writer) error {
356418
x := table.NewWriter()
357419
x.SetOutputMirror(w)

cmd/app/config.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package app
22

33
import (
44
"flag"
5+
"fmt"
56
"os"
67
"path/filepath"
8+
"reflect"
79

810
"github.com/grafana/dskit/flagext"
911
"github.com/grafana/dskit/log"
@@ -23,6 +25,13 @@ import (
2325
"github.com/zachfi/iotcontroller/modules/zonekeeper"
2426
)
2527

28+
func NewDefaultConfig() *Config {
29+
defaultConfig := &Config{}
30+
defaultFS := flag.NewFlagSet("", flag.PanicOnError)
31+
defaultConfig.RegisterFlagsAndApplyDefaults("", defaultFS)
32+
return defaultConfig
33+
}
34+
2635
type Config struct {
2736
Target string `yaml:"target"`
2837
EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"`
@@ -100,3 +109,81 @@ func loadYamlFile(filename string, d interface{}) error {
100109

101110
return nil
102111
}
112+
113+
// yamlMarshalUnmarshal utility function that converts a YAML interface in a map
114+
// doing marshal and unmarshal of the parameter
115+
func yamlMarshalUnmarshal(in interface{}) (map[interface{}]interface{}, error) {
116+
yamlBytes, err := yaml.Marshal(in)
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
object := make(map[interface{}]interface{})
122+
if err := yaml.Unmarshal(yamlBytes, object); err != nil {
123+
return nil, err
124+
}
125+
126+
return object, nil
127+
}
128+
129+
// diffConfig utility function that returns the diff between two config map objects
130+
func diffConfig(defaultConfig, actualConfig map[interface{}]interface{}) (map[interface{}]interface{}, error) {
131+
output := make(map[interface{}]interface{})
132+
133+
for key, value := range actualConfig {
134+
135+
defaultValue, ok := defaultConfig[key]
136+
if !ok {
137+
output[key] = value
138+
continue
139+
}
140+
141+
switch v := value.(type) {
142+
case int:
143+
defaultV, ok := defaultValue.(int)
144+
if !ok || defaultV != v {
145+
output[key] = v
146+
}
147+
case string:
148+
defaultV, ok := defaultValue.(string)
149+
if !ok || defaultV != v {
150+
output[key] = v
151+
}
152+
case bool:
153+
defaultV, ok := defaultValue.(bool)
154+
if !ok || defaultV != v {
155+
output[key] = v
156+
}
157+
case []interface{}:
158+
defaultV, ok := defaultValue.([]interface{})
159+
if !ok || !reflect.DeepEqual(defaultV, v) {
160+
output[key] = v
161+
}
162+
case float64:
163+
defaultV, ok := defaultValue.(float64)
164+
if !ok || !reflect.DeepEqual(defaultV, v) {
165+
output[key] = v
166+
}
167+
case nil:
168+
if defaultValue != nil {
169+
output[key] = v
170+
}
171+
case map[interface{}]interface{}:
172+
defaultV, ok := defaultValue.(map[interface{}]interface{})
173+
if !ok {
174+
output[key] = value
175+
}
176+
diff, err := diffConfig(defaultV, v)
177+
if err != nil {
178+
return nil, err
179+
}
180+
if len(diff) > 0 {
181+
output[key] = diff
182+
}
183+
default:
184+
return nil, fmt.Errorf("unsupported type %T", v)
185+
}
186+
}
187+
188+
return output, nil
189+
}

cmd/app/modules.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (a *App) setupModuleManager() error {
8383
Harvester: {Server, MQTTClient},
8484
HookReceiver: {Server},
8585
Router: {Server, KubeClient},
86-
Weather: {Server, Conditioner},
86+
Weather: {Server},
8787
ZoneKeeper: {Server, MQTTClient, KubeClient},
8888

8989
// Timer: {Server},
@@ -144,7 +144,7 @@ func (a *App) initWeather() (services.Service, error) {
144144
return services.NewIdleService(nil, nil), nil
145145
}
146146

147-
c, err := common.NewClientConn(a.cfg.HookReceiver.EventReceiverClient)
147+
c, err := common.NewClientConn(a.cfg.Weather.EventReceiverClient)
148148
if err != nil {
149149
return nil, fmt.Errorf("failed to create new client connection; %w", err)
150150
}

modules/conditioner/conditioner.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,12 @@ func (c *Conditioner) Alert(ctx context.Context, req *iotv1proto.AlertRequest) (
118118
}
119119

120120
if active {
121-
err = activateRemediation(ctx, rem, c.zonekeeperClient)
121+
err = c.sched.activateRemediation(ctx, rem, c.zonekeeperClient)
122122
if err != nil {
123123
c.logger.Error("failed to activate condition alert", "err", err)
124124
}
125125
} else {
126-
err = deactivateRemediation(ctx, rem, c.zonekeeperClient)
126+
err = c.sched.deactivateRemediation(ctx, rem, c.zonekeeperClient)
127127
if err != nil {
128128
c.logger.Error("failed to deactivate condition alert", "err", err)
129129
}
@@ -199,7 +199,7 @@ func (c *Conditioner) Epoch(ctx context.Context, req *iotv1proto.EpochRequest) (
199199
),
200200
)
201201

202-
err = activateRemediation(ctx, rem, c.zonekeeperClient)
202+
err = c.sched.activateRemediation(ctx, rem, c.zonekeeperClient)
203203
if err != nil {
204204
c.logger.Error("failed to run condition epoch", "err", err)
205205
}
@@ -218,7 +218,7 @@ func (c *Conditioner) Epoch(ctx context.Context, req *iotv1proto.EpochRequest) (
218218
// Schedule the zone activation
219219
if activate := activateRequest(ctx, rem); activate != nil {
220220
err = c.sched.add(ctx, strings.Join([]string{req.Location, req.Name, cond.Name, rem.Zone, "activate"}, "-"), start, activate)
221-
if err != nil {
221+
if err != nil && !errors.Is(err, ErrEmptyRequest) {
222222
c.logger.Error("failed to schedule activation", "err", err)
223223
errs = append(errs, fmt.Errorf("condition %q: %w", cond.Name, err))
224224
}
@@ -227,14 +227,14 @@ func (c *Conditioner) Epoch(ctx context.Context, req *iotv1proto.EpochRequest) (
227227
// Schedule the zone deactivation
228228
if deactivate := deactivateRequest(ctx, rem); deactivate == nil {
229229
err = c.sched.add(ctx, strings.Join([]string{req.Location, req.Name, cond.Name, rem.Zone, "deactivate"}, "-"), stop, deactivate)
230-
if err != nil {
230+
if err != nil && !errors.Is(err, ErrEmptyRequest) {
231231
c.logger.Error("failed to schedule deactivation", "err", err)
232232
errs = append(errs, fmt.Errorf("condition %q: %w", cond.Name, err))
233233
}
234234
}
235235
} else if now.After(stop) {
236236
// If we are past the stop time, deactivate immediately.
237-
err = deactivateRemediation(ctx, rem, c.zonekeeperClient)
237+
err = c.sched.deactivateRemediation(ctx, rem, c.zonekeeperClient)
238238
if err != nil {
239239
c.logger.Error("failed to run condition epoch", "err", err)
240240
}
@@ -258,7 +258,7 @@ func (c *Conditioner) Status() []scheduleStatus {
258258
func (c *Conditioner) setSchedule(ctx context.Context, cond apiv1.Condition) {
259259
var err error
260260

261-
ctx, span := c.tracer.Start(ctx, "Conditioner.setSchedule") // trace.WithAttributes(attributes...),
261+
ctx, span := c.tracer.Start(ctx, "Conditioner.setSchedule", trace.WithAttributes(attribute.String("name", cond.Name)))
262262
defer tracing.ErrHandler(span, err, "set schedule failed", c.logger)
263263

264264
if !cond.Spec.Enabled {
@@ -292,7 +292,7 @@ func (c *Conditioner) setSchedule(ctx context.Context, cond apiv1.Condition) {
292292
req = activateRequest(ctx, rem)
293293

294294
err = c.sched.add(ctx, strings.Join([]string{cond.Name, "schedule", rem.Zone, "activate"}, "-"), next, req)
295-
if err != nil {
295+
if err != nil && !errors.Is(err, ErrEmptyRequest) {
296296
span.AddEvent("failed to set schedule", trace.WithAttributes(attribute.String("err", err.Error())))
297297
c.logger.Error("failed to set schedule", "err", err)
298298
}
@@ -409,7 +409,7 @@ func (c *Conditioner) withinActiveWindow(ctx context.Context, rem apiv1.Remediat
409409
for _, ti := range rem.TimeIntervals {
410410
tip, err := ti.AsPrometheus()
411411
if err != nil {
412-
c.logger.Error("invalid time interval configuration", "err", err)
412+
c.logger.Error("invalid time interval configuration", "err", err, "interval", fmt.Sprintf("%+v", ti))
413413
continue
414414
}
415415

@@ -457,7 +457,11 @@ func (c *Conditioner) runTimer(ctx context.Context) {
457457
names[cond.Name] = struct{}{}
458458
}
459459

460-
c.sched.removeExtraneous(names)
460+
// FIXME: this used to work because there was a 1:1 relationship between the
461+
// condition in the k8s API and the event. Now that we are scheduling epoch
462+
// events, we don't want to clean those up before they have fired. Do we
463+
// need this removal if they clean themselves up after execution?
464+
// c.sched.removeExtraneous(names)
461465
}
462466

463467
func (c *Conditioner) starting(ctx context.Context) error {
@@ -488,12 +492,12 @@ func (c *Conditioner) stopping(_ error) error {
488492
return nil
489493
}
490494

491-
func activateRemediation(ctx context.Context, rem apiv1.Remediation, zonekeeperClient iotv1proto.ZoneKeeperServiceClient) error {
492-
return execRequest(ctx, activateRequest(ctx, rem), zonekeeperClient)
495+
func (s *schedule) activateRemediation(ctx context.Context, rem apiv1.Remediation, zonekeeperClient iotv1proto.ZoneKeeperServiceClient) error {
496+
return s.execRequest(ctx, activateRequest(ctx, rem), zonekeeperClient)
493497
}
494498

495-
func deactivateRemediation(ctx context.Context, rem apiv1.Remediation, zonekeeperClient iotv1proto.ZoneKeeperServiceClient) error {
496-
return execRequest(ctx, deactivateRequest(ctx, rem), zonekeeperClient)
499+
func (s *schedule) deactivateRemediation(ctx context.Context, rem apiv1.Remediation, zonekeeperClient iotv1proto.ZoneKeeperServiceClient) error {
500+
return s.execRequest(ctx, deactivateRequest(ctx, rem), zonekeeperClient)
497501
}
498502

499503
func activateRequest(_ context.Context, rem apiv1.Remediation) *request {

modules/conditioner/errors.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package conditioner
2+
3+
import "errors"
4+
5+
var ErrEmptyRequest = errors.New("empty request")

0 commit comments

Comments
 (0)