Skip to content

Commit 9d7c817

Browse files
author
Lars Grahmann
committed
Honor Kafka admin connector timeouts with full coverage
- Update pkg/admin/brokerclient.go to expose a requestTimeout helper that returns the configured connector timeout when set (defaulting to defaultRequestTimeout), and use it for alter-reassignment and leader-election requests so user overrides take effect. - Teach pkg/admin/connector.go to propagate ConnTimeout to whichever dialer path we take, while keeping the 10s default when nothing is supplied. - Harden scripts/set_up_net_alias.sh with set -euo pipefail, better Linux tooling detection (ifconfig vs ip), idempotent alias creation, and clearer error handling
1 parent 83e63fd commit 9d7c817

File tree

5 files changed

+179
-14
lines changed

5 files changed

+179
-14
lines changed

pkg/admin/brokerclient.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
const (
18-
defaultTimeout = 5 * time.Second
18+
defaultRequestTimeout = 5 * time.Second
1919

2020
// Used for filtering out default configs
2121
configSourceUnknown int8 = 0
@@ -138,6 +138,14 @@ func NewBrokerAdminClient(
138138
return adminClient, nil
139139
}
140140

141+
func (c *BrokerAdminClient) requestTimeout() time.Duration {
142+
if c.config.ConnTimeout > 0 {
143+
return c.config.ConnTimeout
144+
}
145+
146+
return defaultRequestTimeout
147+
}
148+
141149
// GetClusterID gets the ID of the cluster.
142150
func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error) {
143151
resp, err := c.getMetadata(ctx, nil)
@@ -597,7 +605,7 @@ func (c *BrokerAdminClient) AssignPartitions(
597605
req := kafka.AlterPartitionReassignmentsRequest{
598606
Topic: topic,
599607
Assignments: apiAssignments,
600-
Timeout: defaultTimeout,
608+
Timeout: c.requestTimeout(),
601609
}
602610
log.Debugf("AlterPartitionReassignments request: %+v", req)
603611

@@ -708,7 +716,7 @@ func (c *BrokerAdminClient) RunLeaderElection(
708716
req := kafka.ElectLeadersRequest{
709717
Topic: topic,
710718
Partitions: partitions,
711-
Timeout: defaultTimeout,
719+
Timeout: c.requestTimeout(),
712720
}
713721
log.Debugf("ElectLeaders request: %+v", req)
714722

pkg/admin/brokerclient_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@ func TestBrokerClientControllerID(t *testing.T) {
4444
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
4545
}
4646

47+
func TestBrokerAdminClientRequestTimeoutDefault(t *testing.T) {
48+
client := &BrokerAdminClient{}
49+
50+
assert.Equal(t, defaultRequestTimeout, client.requestTimeout())
51+
}
52+
53+
func TestBrokerAdminClientRequestTimeoutOverride(t *testing.T) {
54+
customTimeout := 42 * time.Second
55+
client := &BrokerAdminClient{
56+
config: BrokerAdminClientConfig{
57+
ConnectorConfig: ConnectorConfig{
58+
ConnTimeout: customTimeout,
59+
},
60+
},
61+
}
62+
63+
assert.Equal(t, customTimeout, client.requestTimeout())
64+
}
65+
4766
func TestBrokerClientGetClusterID(t *testing.T) {
4867
if !util.CanTestBrokerAdmin() {
4968
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")

pkg/admin/connector.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
7373
Config: config,
7474
}
7575

76+
timeout := config.ConnTimeout
77+
if timeout == 0 {
78+
timeout = 10 * time.Second
79+
}
80+
7681
var mechanismClient sasl.Mechanism
7782
var tlsConfig *tls.Config
7883
var err error
@@ -143,6 +148,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
143148

144149
if !config.TLS.Enabled {
145150
connector.Dialer = kafka.DefaultDialer
151+
connector.Dialer.Timeout = timeout
146152
connector.Dialer.SASLMechanism = mechanismClient
147153
} else {
148154
var certs []tls.Certificate
@@ -184,7 +190,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
184190
}
185191
connector.Dialer = &kafka.Dialer{
186192
SASLMechanism: mechanismClient,
187-
Timeout: 10 * time.Second,
193+
Timeout: timeout,
188194
TLS: tlsConfig,
189195
}
190196
}

pkg/admin/connector_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package admin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
"github.com/segmentio/kafka-go"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestNewConnectorDefaultTimeout(t *testing.T) {
16+
originalTimeout := kafka.DefaultDialer.Timeout
17+
t.Cleanup(func() { kafka.DefaultDialer.Timeout = originalTimeout })
18+
19+
connector, err := NewConnector(
20+
ConnectorConfig{
21+
BrokerAddr: "localhost:9092",
22+
},
23+
)
24+
require.NoError(t, err)
25+
26+
assert.Equal(t, 10*time.Second, connector.Dialer.Timeout)
27+
}
28+
29+
func TestNewConnectorCustomTimeout(t *testing.T) {
30+
customTimeout := 3 * time.Second
31+
32+
connector, err := NewConnector(
33+
ConnectorConfig{
34+
BrokerAddr: "localhost:9092",
35+
ConnTimeout: customTimeout,
36+
TLS: TLSConfig{
37+
Enabled: true,
38+
SkipVerify: true,
39+
},
40+
},
41+
)
42+
require.NoError(t, err)
43+
44+
assert.Equal(t, customTimeout, connector.Dialer.Timeout)
45+
assert.NotNil(t, connector.Dialer.TLS)
46+
}
47+
48+
func TestConnectorDialerTimeoutHappyPath(t *testing.T) {
49+
listener, err := net.Listen("tcp", "127.0.0.1:0")
50+
require.NoError(t, err)
51+
t.Cleanup(func() { _ = listener.Close() })
52+
53+
acceptErrCh := make(chan error, 1)
54+
go func() {
55+
conn, err := listener.Accept()
56+
if err != nil {
57+
acceptErrCh <- err
58+
return
59+
}
60+
acceptErrCh <- nil
61+
_ = conn.Close()
62+
}()
63+
64+
connector, err := NewConnector(
65+
ConnectorConfig{
66+
BrokerAddr: listener.Addr().String(),
67+
ConnTimeout: 100 * time.Millisecond,
68+
},
69+
)
70+
require.NoError(t, err)
71+
72+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
73+
defer cancel()
74+
75+
conn, err := connector.Dialer.DialContext(ctx, "tcp", listener.Addr().String())
76+
require.NoError(t, err)
77+
require.NoError(t, conn.Close())
78+
79+
select {
80+
case err := <-acceptErrCh:
81+
require.NoError(t, err)
82+
case <-time.After(time.Second):
83+
t.Fatal("timed out waiting for listener accept")
84+
}
85+
}
86+
87+
func TestConnectorDialerTimeoutUnhappyPath(t *testing.T) {
88+
listener, err := net.Listen("tcp", "127.0.0.1:0")
89+
require.NoError(t, err)
90+
t.Cleanup(func() { _ = listener.Close() })
91+
92+
connector, err := NewConnector(
93+
ConnectorConfig{
94+
BrokerAddr: listener.Addr().String(),
95+
ConnTimeout: time.Nanosecond,
96+
},
97+
)
98+
require.NoError(t, err)
99+
100+
_, err = connector.Dialer.DialContext(context.Background(), "tcp", listener.Addr().String())
101+
require.Error(t, err)
102+
103+
var netErr net.Error
104+
if errors.As(err, &netErr) {
105+
require.True(t, netErr.Timeout(), "expected timeout error, got: %v", err)
106+
return
107+
}
108+
109+
require.True(t, errors.Is(err, context.DeadlineExceeded), "expected deadline exceeded, got: %v", err)
110+
}

scripts/set_up_net_alias.sh

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,39 @@
11
#!/bin/bash
22

3+
set -euo pipefail
4+
35
ADDR=169.254.123.123
6+
NETMASK=255.255.255.0
7+
CIDR=24
48
echo "Aliasing $ADDR to localhost..."
59

6-
UNAME=$(uname -a)
7-
case "$UNAME" in
8-
Linux*) sudo ifconfig lo:0 $ADDR netmask 255.255.255.0 up;;
9-
Darwin*) sudo ifconfig lo0 alias $ADDR;;
10-
*) exit
10+
alias_exists_with_ip() {
11+
ip addr show dev lo | grep -q "$ADDR"
12+
}
13+
14+
OS=$(uname -s)
15+
case "$OS" in
16+
Linux*)
17+
if command -v ifconfig >/dev/null 2>&1; then
18+
sudo ifconfig lo:0 "$ADDR" netmask "$NETMASK" up
19+
elif command -v ip >/dev/null 2>&1; then
20+
if alias_exists_with_ip; then
21+
echo "Alias already present on loopback interface."
22+
else
23+
sudo ip addr add "$ADDR/$CIDR" dev lo
24+
fi
25+
else
26+
>&2 echo "Neither ifconfig nor ip is available; cannot create alias."
27+
exit 1
28+
fi
29+
;;
30+
Darwin*)
31+
sudo ifconfig lo0 alias "$ADDR"
32+
;;
33+
*)
34+
>&2 echo "Unsupported platform: $OS"
35+
exit 1
36+
;;
1137
esac
1238

13-
if [[ $? != 0 ]]
14-
then
15-
>&2 echo "Unable to create alias"
16-
exit 1
17-
fi
39+
echo "Alias configured."

0 commit comments

Comments
 (0)