diff --git a/README.md b/README.md index 7b67ab6..2acd9be 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Octo-proxy or `octo` is simple TCP & TLS Proxy with mutual authentication and tr - Support for multiple targets, accessed in random order (load balancer) - Reload configuration or certificate without dropping connection - Expose metrics that can be consumed by prometheus +- Monitor CNAME records to force reconnection achieving DNS failover with long lived connections ### Usage #### Run octo with ad-hoc command diff --git a/cmd/octo/main.go b/cmd/octo/main.go index 291d584..427f04d 100644 --- a/cmd/octo/main.go +++ b/cmd/octo/main.go @@ -33,6 +33,8 @@ Flags: Specify target backend which traffic will be forwarded -metrics Specify address and port to run the metrics server + -monitor + Specify a CNAME record to monitor and reset connections on changes -debug Enable debug log messages -version @@ -77,6 +79,7 @@ func runMain() error { listener = flag.String("listener", "127.0.0.1:5000", "Specify listener for running octo-proxy") target = flag.String("target", "", "Specify comma-separated list of targets for running octo-proxy") metrics = flag.String("metrics", "0.0.0.0:9123", "Address and port to run the metrics server on") + monitor = flag.String("monitor", "", "Specify a CNAME record to monitor and reset connections on changes") debug = flag.Bool("debug", false, "Enable debug messages") ) @@ -93,7 +96,7 @@ func runMain() error { if *target != "" { targets := strings.Split(*target, ",") - c, err := config.GenerateConfig(*listener, targets, *metrics) + c, err := config.GenerateConfig(*listener, targets, *metrics, *monitor) if err != nil { return err } diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 7cbe174..35bb46a 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -13,6 +13,7 @@ | listener | [`Hostconfig`](#hostconfig) | Set of listener related configuration. All of the incoming request to octo-proxy will be handled by this listener. | yes | | targets | [`Hostconfig[]`](#hostconfig) | Set of target related configurations. These targets are backends which octo-proxy will forward all incoming traffic accepted by the listener. | yes | | mirror | [`Hostconfig`](#hostconfig) | Set of mirror related configuration. If this configuration is enabled, all incoming requests will also be forwarded to this mirror. Unlike the `target`, in a `mirror` setup, we implement 'fire and forget,' where every request is only forwarded, and the response is ignored. | no | +| monitor | `` | A CNAME to monitor for changes. When the CNAME changes the connections are closed forcing reconnection. When unset the monitoring feature is disabled. | no | ## Hostconfig | Field | Type | Description | Required | diff --git a/pkg/config/config.go b/pkg/config/config.go index f0a8537..02e5ae8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -33,6 +33,7 @@ type ServerConfig struct { Listener HostConfig `yaml:"listener"` Targets []HostConfig `yaml:"targets"` Mirror HostConfig `yaml:"mirror"` + Monitor string `yaml:"monitor"` } type HostConfig struct { @@ -115,7 +116,7 @@ func readConfig(r io.Reader) (*Config, error) { return config, nil } -func GenerateConfig(listener string, targets []string, metrics string) (*Config, error) { +func GenerateConfig(listener string, targets []string, metrics string, monitor string) (*Config, error) { l := strings.Split(listener, ":") if len(l) != 2 { @@ -131,6 +132,7 @@ func GenerateConfig(listener string, targets []string, metrics string) (*Config, Port: l[1], }, Targets: []HostConfig{}, + Monitor: monitor, }, }, } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 3e17ff0..17a2d11 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -114,6 +114,7 @@ func TestGenerateConfig(t *testing.T) { Listener string Targets []string Metrics string + Monitor string expectedConfig *Config expectedError string }{ @@ -122,6 +123,7 @@ func TestGenerateConfig(t *testing.T) { Listener: "127.0.0.1:8080", Targets: []string{"127.0.0.1:80"}, Metrics: "127.0.0.1:9123", + Monitor: "octo.example.com", expectedConfig: &Config{ ServerConfigs: []ServerConfig{ { @@ -142,6 +144,7 @@ func TestGenerateConfig(t *testing.T) { }, }, }, + Monitor: "octo.example.com", }, }, MetricsConfig: HostConfig{ @@ -212,7 +215,7 @@ func TestGenerateConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.Name, func(t *testing.T) { - c, err := GenerateConfig(tt.Listener, tt.Targets, tt.Metrics) + c, err := GenerateConfig(tt.Listener, tt.Targets, tt.Metrics, tt.Monitor) if err != nil { if !strings.Contains(err.Error(), tt.expectedError) { t.Fatalf("got %v, want %s", err, tt.expectedError) diff --git a/pkg/proxy/monitor.go b/pkg/proxy/monitor.go new file mode 100644 index 0000000..1495cd9 --- /dev/null +++ b/pkg/proxy/monitor.go @@ -0,0 +1,64 @@ +package proxy + +import ( + "context" + "net" + "time" + + "github.com/rs/zerolog/log" +) + +type Resolver interface { + LookupCNAME(targetCname string) (string, error) +} + +type goNetResolver struct{} + +func (goNetResolver) LookupCNAME(targetCname string) (string, error) { + return net.LookupCNAME(targetCname) +} + +func GoNetResolver() goNetResolver { + return goNetResolver{} +} + +type Monitor struct { + targetCname string + lastData string + resolver Resolver + interval time.Duration +} + +func NewMonitor(resolver Resolver, targetCname string, interval time.Duration) *Monitor { + return &Monitor{ + targetCname: targetCname, + resolver: resolver, + interval: interval, + } +} + +func (m *Monitor) Run(ctx context.Context, callback func()) { + for { + select { + case <-ctx.Done(): + return + default: + } + + cname, err := m.resolver.LookupCNAME(m.targetCname) + if err != nil { + log.Error().Err(err).Msg("failed to lookup CNAME") + cname = m.lastData + } + if m.lastData != "" && m.lastData != cname { + log.Info().Msg("CNAME changed, running callback") + callback() + } + m.lastData = cname + + // Go net doesn't provide us with the TTL, therefor we poll frequently and + // expect the appropriate levels of caching to be in place, in order for this + // not to cause undue load on the DNS servers. + time.Sleep(m.interval) + } +} diff --git a/pkg/proxy/monitor_test.go b/pkg/proxy/monitor_test.go new file mode 100644 index 0000000..1a31953 --- /dev/null +++ b/pkg/proxy/monitor_test.go @@ -0,0 +1,94 @@ +package proxy + +import ( + "context" + "errors" + "testing" + "time" +) + +type constantResolver struct { + value string +} + +func (c constantResolver) LookupCNAME(targetCname string) (string, error) { + return c.value, nil +} + +func TestMonitorConstantResolver(t *testing.T) { + monitorInterval := 100 * time.Millisecond + m := NewMonitor(constantResolver{"test"}, "test", monitorInterval) + ctx, cancel := context.WithCancel(context.Background()) + + var called bool + go m.Run(ctx, func() { + called = true + }) + + time.Sleep(2 * monitorInterval) + cancel() + + if called { + t.Error("expected callback not to be called") + } +} + +type changingResolver struct { + values []string + errs []error + index int +} + +func (c *changingResolver) LookupCNAME(targetCname string) (string, error) { + if c.index >= len(c.values) { + c.index = 0 + } + value := c.values[c.index] + err := c.errs[c.index] + c.index++ + return value, err +} + +func TestMonitorChangingResolver(t *testing.T) { + monitorInterval := 100 * time.Millisecond + m := NewMonitor(&changingResolver{ + values: []string{"test1", "test2"}, + errs: []error{nil, nil}, + index: 0, + }, "test", monitorInterval) + ctx, cancel := context.WithCancel(context.Background()) + + var called bool + go m.Run(ctx, func() { + called = true + }) + + time.Sleep(2 * monitorInterval) + cancel() + + if !called { + t.Error("expected callback to be called") + } +} + +func TestMonitorContinuesOnError(t *testing.T) { + monitorInterval := 100 * time.Millisecond + m := NewMonitor(&changingResolver{ + values: []string{"test1", "", "test2"}, + errs: []error{nil, errors.New("test"), nil}, + index: 0, + }, "test", monitorInterval) + ctx, cancel := context.WithCancel(context.Background()) + + var called bool + go m.Run(ctx, func() { + called = true + }) + + time.Sleep(2 * monitorInterval) + cancel() + + if !called { + t.Error("expected callback to be called") + } +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 843fb79..e5413ab 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -127,10 +127,26 @@ func (p *Proxy) handleConn(ctx context.Context, c config.ServerConfig) { continue } + connectionCtx, connectionCancel := context.WithCancel(ctx) + if c.Monitor != "" { + p.Wg.Add(1) + go func() { + monitor := NewMonitor(goNetResolver{}, c.Monitor, 5*time.Second) + monitor.Run(connectionCtx, func() { + log.Info().Msg("CNAME changed, closing connection") + if err := srcConn.Close(); err != nil { + log.Error().Err(err).Msg("error closing connection") + } + }) + p.Wg.Done() + }() + } + p.Wg.Add(1) go func() { p.forwardConn(ctx, c, srcConn) p.Wg.Done() + connectionCancel() downstreamConnActive.With(prometheus.Labels{"name": p.Name}).Dec() }() } diff --git a/pkg/proxy/proxy_test.go b/pkg/proxy/proxy_test.go index 7d213ac..fe60393 100644 --- a/pkg/proxy/proxy_test.go +++ b/pkg/proxy/proxy_test.go @@ -27,7 +27,7 @@ func TestProxy(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -65,7 +65,7 @@ func TestProxyWithMultipleTargets(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"localhost:9001", backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"localhost:9001", backend}, "", "") if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestProxyWithMirror(t *testing.T) { mirror := testhelper.RunTestServer(&wg, mirrorResult) // prepare octo proxy configuration - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -159,7 +159,7 @@ func TestProxyWithSimpleTLS(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -211,7 +211,7 @@ func TestProxyWithMutualTLS(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -267,7 +267,7 @@ func TestProxyWithMutualTLSWithConfiguredSNI(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -320,7 +320,7 @@ func TestProxyWithMutualTLSWithConfiguredCRLWithRevokedClientCert(t *testing.T) backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -373,7 +373,7 @@ func TestProxyWithMutualTLSWithConfiguredCRLWithRevokedServerCert(t *testing.T) backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -426,7 +426,7 @@ func TestProxyMutualTLSWhenClientUsingInvalidCertificate(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -478,7 +478,7 @@ func TestProxyMutualTLSWhenClientNotProvideCA(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -530,7 +530,7 @@ func TestProxyMutualTLSWhenClientUseWrongCA(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -588,7 +588,7 @@ func TestProxyWithTargetSimpleTLS(t *testing.T) { backend := RunTestTLSServer(&wg, tC, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -644,7 +644,7 @@ func TestProxyWithTargetMutualTLS(t *testing.T) { backend := RunTestTLSServer(&wg, tC, result) // prepare configuration for octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -687,7 +687,7 @@ func TestProxyWithTargetMutualTLS(t *testing.T) { func TestUnreachableTarget(t *testing.T) { // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"127.0.0.1:10"}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{"127.0.0.1:10"}, "", "") if err != nil { t.Fatal(err) } @@ -729,7 +729,7 @@ func TestUnreachableMirror(t *testing.T) { backend := testhelper.RunTestServer(&wg, result) // prepare octo proxy configuration - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -773,7 +773,7 @@ func TestProxyConcurrent(t *testing.T) { backend := testhelper.RunTestServerWithResponse(&wg, connCount) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -822,7 +822,7 @@ func TestProxyWithSlowTarget(t *testing.T) { backend := testhelper.RunTestServerSlowMode(&wg, 1) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) } @@ -884,7 +884,7 @@ func TestProxyWithZeroTimeout(t *testing.T) { backend := testhelper.RunTestServerSlowMode(&wg, 1) // start octo proxy - cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "") + cfg, err := config.GenerateConfig("127.0.0.1:9000", []string{backend}, "", "") if err != nil { t.Fatal(err) }