From fd6c819a66966f3982cde16b3a0514203503a46d Mon Sep 17 00:00:00 2001 From: Morgan Hill Date: Tue, 29 Oct 2024 12:27:57 +0100 Subject: [PATCH] Introduce CNAME monitoring feature This change introduces the feature that octo-proxy can close connections when it detects a failover based on a change to a CNAME record. It is occasionally desirable to failover traffic from one target to another. A popular way of doing this failover is by changing a CNAME. The next connection that is made will use the updated DNS records to resolve so new connections will me made to the newly desired targets. Existing connections will remain intact unless otherwise disconnected; this behaviour may or may not be desirable depending on the application. --- README.md | 1 + cmd/octo/main.go | 5 ++- docs/CONFIGURATION.md | 1 + pkg/config/config.go | 4 +- pkg/config/config_test.go | 5 ++- pkg/proxy/monitor.go | 64 ++++++++++++++++++++++++++ pkg/proxy/monitor_test.go | 94 +++++++++++++++++++++++++++++++++++++++ pkg/proxy/proxy.go | 16 +++++++ pkg/proxy/proxy_test.go | 36 +++++++-------- 9 files changed, 205 insertions(+), 21 deletions(-) create mode 100644 pkg/proxy/monitor.go create mode 100644 pkg/proxy/monitor_test.go diff --git a/README.md b/README.md index 7b67ab6..2acd9be 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Octo-proxy or `octo` is simple TCP & TLS Proxy with mutual authentication and tr - Support for multiple targets, accessed in random order (load balancer) - Reload configuration or certificate without dropping connection - Expose metrics that can be consumed by prometheus +- Monitor CNAME records to force reconnection achieving DNS failover with long lived connections ### Usage #### Run octo with ad-hoc command diff --git a/cmd/octo/main.go b/cmd/octo/main.go index 291d584..427f04d 100644 --- a/cmd/octo/main.go +++ b/cmd/octo/main.go @@ -33,6 +33,8 @@ Flags: Specify target backend which traffic will be forwarded -metrics Specify address and port to run the metrics server + -monitor + Specify a CNAME record to monitor and reset connections on changes -debug Enable debug log messages -version @@ -77,6 +79,7 @@ func runMain() error { listener = flag.String("listener", "127.0.0.1:5000", "Specify listener for running octo-proxy") target = flag.String("target", "", "Specify comma-separated list of targets for running octo-proxy") metrics = flag.String("metrics", "0.0.0.0:9123", "Address and port to run the metrics server on") + monitor = flag.String("monitor", "", "Specify a CNAME record to monitor and reset connections on changes") debug = flag.Bool("debug", false, "Enable debug messages") ) @@ -93,7 +96,7 @@ func runMain() error { if *target != "" { targets := strings.Split(*target, ",") - c, err := config.GenerateConfig(*listener, targets, *metrics) + c, err := config.GenerateConfig(*listener, targets, *metrics, *monitor) if err != nil { return err } diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 7cbe174..35bb46a 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -13,6 +13,7 @@ | listener | [`Hostconfig`](#hostconfig) | Set of listener related configuration. All of the incoming request to octo-proxy will be handled by this listener. | yes | | targets | [`Hostconfig[]`](#hostconfig) | Set of target related configurations. These targets are backends which octo-proxy will forward all incoming traffic accepted by the listener. | yes | | mirror | [`Hostconfig`](#hostconfig) | Set of mirror related configuration. If this configuration is enabled, all incoming requests will also be forwarded to this mirror. Unlike the `target`, in a `mirror` setup, we implement 'fire and forget,' where every request is only forwarded, and the response is ignored. | no | +| monitor | `` | A CNAME to monitor for changes. When the CNAME changes the connections are closed forcing reconnection. When unset the monitoring feature is disabled. | no | ## Hostconfig | Field | Type | Description | Required | diff --git a/pkg/config/config.go b/pkg/config/config.go index f0a8537..02e5ae8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -33,6 +33,7 @@ type ServerConfig struct { Listener HostConfig `yaml:"listener"` Targets []HostConfig `yaml:"targets"` Mirror HostConfig `yaml:"mirror"` + Monitor string `yaml:"monitor"` } type HostConfig struct { @@ -115,7 +116,7 @@ func readConfig(r io.Reader) (*Config, error) { return config, nil } -func GenerateConfig(listener string, targets []string, metrics string) (*Config, error) { +func GenerateConfig(listener string, targets []string, metrics string, monitor string) (*Config, error) { l := strings.Split(listener, ":") if len(l) != 2 { @@ -131,6 +132,7 @@ func GenerateConfig(listener string, targets []string, metrics string) (*Config, Port: l[1], }, Targets: []HostConfig{}, + Monitor: monitor, }, }, } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 3e17ff0..17a2d11 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -114,6 +114,7 @@ func TestGenerateConfig(t *testing.T) { Listener string Targets []string Metrics string + Monitor string expectedConfig *Config expectedError string }{ @@ -122,6 +123,7 @@ func TestGenerateConfig(t *testing.T) { Listener: "127.0.0.1:8080", Targets: []string{"127.0.0.1:80"}, Metrics: "127.0.0.1:9123", + Monitor: "octo.example.com", expectedConfig: &Config{ ServerConfigs: []ServerConfig{ { @@ -142,6 +144,7 @@ func TestGenerateConfig(t *testing.T) { }, }, }, + Monitor: "octo.example.com", }, }, MetricsConfig: HostConfig{ @@ -212,7 +215,7 @@ func TestGenerateConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.Name, func(t *testing.T) { - c, err := GenerateConfig(tt.Listener, tt.Targets, tt.Metrics) + c, err := GenerateConfig(tt.Listener, tt.Targets, tt.Metrics, tt.Monitor) if err != nil { if !strings.Contains(err.Error(), tt.expectedError) { t.Fatalf("got %v, want %s", err, tt.expectedError) diff --git a/pkg/proxy/monitor.go b/pkg/proxy/monitor.go new file mode 100644 index 0000000..1495cd9 --- /dev/null +++ b/pkg/proxy/monitor.go @@ -0,0 +1,64 @@ +package proxy + +import ( + "context" + "net" + "time" + + "github.com/rs/zerolog/log" +) + +type Resolver interface { + LookupCNAME(targetCname string) (string, error) +} + +type goNetResolver struct{} + +func (goNetResolver) LookupCNAME(targetCname string) (string, error) { + return net.LookupCNAME(targetCname) +} + +func GoNetResolver() goNetResolver { + return goNetResolver{} +} + +type Monitor struct { + targetCname string + lastData string + resolver Resolver + interval time.Duration +} + +func NewMonitor(resolver Resolver, targetCname string, interval time.Duration) *Monitor { + return &Monitor{ + targetCname: targetCname, + resolver: resolver, + interval: interval, + } +} + +func (m *Monitor) Run(ctx context.Context, callback func()) { + for { + select { + case <-ctx.Done(): + return + default: + } + + cname, err := m.resolver.LookupCNAME(m.targetCname) + if err != nil { + log.Error().Err(err).Msg("failed to lookup CNAME") + cname = m.lastData + } + if m.lastData != "" && m.lastData != cname { + log.Info().Msg("CNAME changed, running callback") + callback() + } + m.lastData = cname + + // Go net doesn't provide us with the TTL, therefor we poll frequently and + // expect the appropriate levels of caching to be in place, in order for this + // not to cause undue load on the DNS servers. + time.Sleep(m.interval) + } +} diff --git a/pkg/proxy/monitor_test.go b/pkg/proxy/monitor_test.go new file mode 100644 index 0000000..1a31953 --- /dev/null +++ b/pkg/proxy/monitor_test.go @@ -0,0 +1,94 @@ +package proxy + +import ( + "context" + "errors" + "testing" + "time" +) + +type constantResolver struct { + value string +} + +func (c constantResolver) LookupCNAME(targetCname string) (string, error) { + return c.value, nil +} + +func TestMonitorConstantResolver(t *testing.T) { + monitorInterval := 100 * time.Millisecond + m := NewMonitor(constantResolver{"test"}, "test", monitorInterval) + ctx, cancel := context.WithCancel(context.Background()) + + var called bool + go m.Run(ctx, func() { + called = true + }) + + time.Sleep(2 * monitorInterval) + cancel() + + if called { + t.Error("expected callback not to be called") + } +} + +type changingResolver struct { + values []string + errs []error + index int +} + +func (c *changingResolver) LookupCNAME(targetCname string) (string, error) { + if c.index >= len(c.values) { + c.index = 0 + } + value := c.values[c.index] + err := c.errs[c.index] + c.index++ + return value, err +} + +func TestMonitorChangingResolver(t *testing.T) { + monitorInterval := 100 * time.Millisecond + m := NewMonitor(&changingResolver{ + values: []string{"test1", "test2"}, + errs: []error{nil, nil}, + index: 0, + }, "test", monitorInterval) + ctx, cancel := context.WithCancel(context.Background()) + + var called bool + go m.Run(ctx, func() { + called = true + }) + + time.Sleep(2 * monitorInterval) + cancel() + + if !called { + t.Error("expected callback to be called") + } +} + +func TestMonitorContinuesOnError(t *testing.T) { + monitorInterval := 100 * time.Millisecond + m := NewMonitor(&changingResolver{ + values: []string{"test1", "", "test2"}, + errs: []error{nil, errors.New("test"), nil}, + index: 0, + }, "test", monitorInterval) + ctx, cancel := context.WithCancel(context.Background()) + + var called bool + go m.Run(ctx, func() { + called = true + }) + + time.Sleep(2 * monitorInterval) + cancel() + + if !called { + t.Error("expected callback to be called") + } +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 843fb79..e5413ab 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -127,10 +127,26 @@ func (p *Proxy) handleConn(ctx context.Context, c config.ServerConfig) { continue } + connectionCtx, connectionCancel := context.WithCancel(ctx) + if c.Monitor != "" { + p.Wg.Add(1) + go func() { + monitor := NewMonitor(goNetResolver{}, c.Monitor, 5*time.Second) + monitor.Run(connectionCtx, func() { + log.Info().Msg("CNAME changed, closing connection") + if err := srcConn.Close(); err != nil { + log.Error().Err(err).Msg("error closing connection") + } + }) + p.Wg.Done() + }() + } + p.Wg.Add(1) go func() { p.forwardConn(ctx, c, srcConn) p.Wg.Done() + connectionCancel() downstreamConnActive.With(prometheus.Labels{"name": p.Name}).Dec() }() } diff --git a/pkg/proxy/proxy_test.go b/pkg/proxy/proxy_test.go index 7d213ac..fe60393 100644 --- a/pkg/proxy/proxy_test.go +++ b/pkg/proxy/proxy_test.go @@ -27,7 +27,7 @@ func TestProxy(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -65,7 +65,7 @@ func TestProxyWithMultipleTargets(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"localhost:9001", backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"localhost:9001", backend}, "", "") if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestProxyWithMirror(t *testing.T) { mirror := testhelper.RunTestServer(&wg, mirrorResult) // prepare octo proxy configuration - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -159,7 +159,7 @@ func TestProxyWithSimpleTLS(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -211,7 +211,7 @@ func TestProxyWithMutualTLS(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -267,7 +267,7 @@ func TestProxyWithMutualTLSWithConfiguredSNI(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -320,7 +320,7 @@ func TestProxyWithMutualTLSWithConfiguredCRLWithRevokedClientCert(t *testing.T) backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -373,7 +373,7 @@ func TestProxyWithMutualTLSWithConfiguredCRLWithRevokedServerCert(t *testing.T) backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -426,7 +426,7 @@ func TestProxyMutualTLSWhenClientUsingInvalidCertificate(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -478,7 +478,7 @@ func TestProxyMutualTLSWhenClientNotProvideCA(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -530,7 +530,7 @@ func TestProxyMutualTLSWhenClientUseWrongCA(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -588,7 +588,7 @@ func TestProxyWithTargetSimpleTLS(t *testing.T) { backend := RunTestTLSServer(&wg, tC, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -644,7 +644,7 @@ func TestProxyWithTargetMutualTLS(t *testing.T) { backend := RunTestTLSServer(&wg, tC, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -687,7 +687,7 @@ func TestProxyWithTargetMutualTLS(t *testing.T) { func TestUnreachableTarget(t *testing.T) { // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"127.0.0.1:10"}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"127.0.0.1:10"}, "", "") if err != nil { t.Fatal(err) } @@ -729,7 +729,7 @@ func TestUnreachableMirror(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare octo proxy configuration - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -773,7 +773,7 @@ func TestProxyConcurrent(t *testing.T) { backend := testhelper.RunTestServerWithResponse(&wg, connCount) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -822,7 +822,7 @@ func TestProxyWithSlowTarget(t *testing.T) { backend := testhelper.RunTestServerSlowMode(&wg, 1) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -884,7 +884,7 @@ func TestProxyWithZeroTimeout(t *testing.T) { backend := testhelper.RunTestServerSlowMode(&wg, 1) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) }