From 8b16c045ef0eb88fb86baab39d9e1e49ac905629 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 20 Mar 2025 17:42:06 -0400 Subject: [PATCH 01/23] Add native JDK support for UDS in ClientChannels --- .../NonBlockingStatsDClientBuilder.java | 23 +++- .../statsd/UnixDatagramClientChannel.java | 43 +++++-- .../statsd/UnixStreamClientChannel.java | 74 +++++++++--- .../com/timgroup/statsd/VersionUtils.java | 108 ++++++++++++++++++ 4 files changed, 225 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/timgroup/statsd/VersionUtils.java diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 289b16c2..3a94c76b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -1,5 +1,10 @@ package com.timgroup.statsd; +import jnr.constants.platform.Sock; +import jnr.unixsocket.UnixSocketAddress; + +import java.io.IOException; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -521,9 +526,21 @@ public SocketAddress call() { protected static Callable staticUnixResolution( final String path, final UnixSocketAddressWithTransport.TransportType transportType) { return new Callable() { - @Override - public SocketAddress call() { - final UnixSocketAddress socketAddress = new UnixSocketAddress(path); + @Override public SocketAddress call() { + SocketAddress socketAddress; + // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. + if (VersionUtils.isJavaVersionAtLeast(16)) { + try { + // Use reflection to avoid compiling Java 16+ classes in incompatible versions + Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + Method ofMethod = unixDomainSocketAddressClass.getMethod("of", String.class); + socketAddress = (SocketAddress) ofMethod.invoke(null, path); + } catch (Exception e) { + throw new StatsDClientException("Failed to create UnixSocketAddress for native UDS implementation", e); + } + } else { + socketAddress = new UnixSocketAddress(path); + } return new UnixSocketAddressWithTransport(socketAddress, transportType); } }; diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 4fccddf6..ecb7bd28 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -1,10 +1,15 @@ package com.timgroup.statsd; -import java.io.IOException; -import java.net.SocketAddress; import jnr.unixsocket.UnixDatagramChannel; +import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketOptions; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + class UnixDatagramClientChannel extends DatagramClientChannel { /** * Creates a new UnixDatagramClientChannel. @@ -14,17 +19,41 @@ class UnixDatagramClientChannel extends DatagramClientChannel { * @param bufferSize Buffer size * @throws IOException if socket options cannot be set */ - UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) - throws IOException { - super(UnixDatagramChannel.open(), address); + UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { + super(createChannel(address, timeout, bufferSize), address); + } + + private static DatagramChannel createChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { + // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. + if (VersionUtils.isJavaVersionAtLeast(16)) { + try { + // Use reflection to avoid compiling Java 16+ classes in incompatible versions + Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); + Object unixProtocol = Enum.valueOf((Class) protocolFamilyClass, "UNIX"); + Method openMethod = DatagramChannel.class.getMethod("open", protocolFamilyClass); + DatagramChannel channel = (DatagramChannel) openMethod.invoke(null, unixProtocol); + + if (timeout > 0) { + channel.socket().setSoTimeout(timeout); + } + if (bufferSize > 0) { + channel.socket().setSendBufferSize(bufferSize); + } + return channel; + } catch (Exception e) { + throw new IOException("Failed to create UnixDatagramClientChannel for native UDS implementation", e); + } + } + UnixDatagramChannel channel = UnixDatagramChannel.open(); // Set send timeout, to handle the case where the transmission buffer is full // If no timeout is set, the send becomes blocking if (timeout > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); + channel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); } if (bufferSize > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + channel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); } + return channel; } @Override diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index c86c3c57..bf543bfc 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -1,7 +1,9 @@ package com.timgroup.statsd; import java.io.IOException; +import java.lang.reflect.Method; import java.net.SocketAddress; +import java.net.StandardProtocolFamily; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SocketChannel; @@ -11,7 +13,7 @@ /** A ClientChannel for Unix domain sockets. */ public class UnixStreamClientChannel implements ClientChannel { - private final UnixSocketAddress address; + private final SocketAddress address; private final int timeout; private final int connectionTimeout; private final int bufferSize; @@ -29,7 +31,7 @@ public class UnixStreamClientChannel implements ClientChannel { SocketAddress address, int timeout, int connectionTimeout, int bufferSize) throws IOException { this.delegate = null; - this.address = (UnixSocketAddress) address; + this.address = address; this.timeout = timeout; this.connectionTimeout = connectionTimeout; this.bufferSize = bufferSize; @@ -127,40 +129,86 @@ private void connect() throws IOException { } } - UnixSocketChannel delegate = UnixSocketChannel.create(); - long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; + // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. + if (VersionUtils.isJavaVersionAtLeast(16)) { + try { + // Use reflection to avoid compiling Java 16+ classes in incompatible versions + Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); + Object unixProtocol = Enum.valueOf((Class) protocolFamilyClass, "UNIX"); + Method openMethod = SocketChannel.class.getMethod("open", protocolFamilyClass); + SocketChannel channel = (SocketChannel) openMethod.invoke(null, unixProtocol); + + if (connectionTimeout > 0) { + channel.socket().setSoTimeout(connectionTimeout); + } + try { + if (!channel.connect(address)) { + if (connectionTimeout > 0 && System.nanoTime() > deadline) { + throw new IOException("Connection timed out"); + } + if (!channel.finishConnect()) { + throw new IOException("Connection failed"); + } + } + channel.socket().setSoTimeout(Math.max(timeout, 0)); + if (bufferSize > 0) { + channel.socket().setSendBufferSize(bufferSize); + } + } catch (Exception e) { + try { + channel.close(); + } catch (IOException __) { + // ignore + } + throw e; + } + + this.delegate = channel; + } catch (Exception e) { + throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); + } + } + UnixSocketChannel channel = UnixSocketChannel.create(); + if (connectionTimeout > 0) { // Set connect timeout, this should work at least on linux // https://elixir.bootlin.com/linux/v5.7.4/source/net/unix/af_unix.c#L1696 - // We'd have better timeout support if we used Java 16's native Unix domain socket - // support (JEP 380) - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); + channel.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); } + try { - if (!delegate.connect(address)) { + // address should be of type UnixSocketAddress + UnixSocketAddress unixAddress; + if (address instanceof UnixSocketAddress) { + unixAddress = (UnixSocketAddress) address; + } else { + unixAddress = new UnixSocketAddress(address.toString()); + } + + if (!channel.connect(unixAddress)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); } - if (!delegate.finishConnect()) { + if (!channel.finishConnect()) { throw new IOException("Connection failed"); } } - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); + channel.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); if (bufferSize > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + channel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); } } catch (Exception e) { try { - delegate.close(); + channel.close(); } catch (IOException __) { // ignore } throw e; } - this.delegate = delegate; + this.delegate = channel; } @Override diff --git a/src/main/java/com/timgroup/statsd/VersionUtils.java b/src/main/java/com/timgroup/statsd/VersionUtils.java new file mode 100644 index 00000000..63eb6715 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/VersionUtils.java @@ -0,0 +1,108 @@ +package com.timgroup.statsd; + +import java.util.ArrayList; +import java.util.List; + +// Logic copied from dd-trace-java Platform class. See: +// https://github.com/DataDog/dd-trace-java/blob/master/internal-api/src/main/java/datadog/trace/api/Platform.java +public class VersionUtils { + private static final Version JAVA_VERSION = parseJavaVersion(System.getProperty("java.version")); + + private static Version parseJavaVersion(String javaVersion) { + // Remove pre-release part, usually -ea + final int indexOfDash = javaVersion.indexOf('-'); + if (indexOfDash >= 0) { + javaVersion = javaVersion.substring(0, indexOfDash); + } + + int major = 0; + int minor = 0; + int update = 0; + + try { + List nums = splitDigits(javaVersion); + major = nums.get(0); + + // for java 1.6/1.7/1.8 + if (major == 1) { + major = nums.get(1); + minor = nums.get(2); + update = nums.get(3); + } else { + minor = nums.get(1); + update = nums.get(2); + } + } catch (NumberFormatException | IndexOutOfBoundsException e) { + // unable to parse version string - do nothing + } + return new Version(major, minor, update); + } + + private static List splitDigits(String str) { + List results = new ArrayList<>(); + + int len = str.length(); + + int value = 0; + for (int i = 0; i < len; i++) { + char ch = str.charAt(i); + if (ch >= '0' && ch <= '9') { + value = value * 10 + (ch - '0'); + } else if (ch == '.' || ch == '_' || ch == '+') { + results.add(value); + value = 0; + } else { + throw new NumberFormatException(); + } + } + results.add(value); + return results; + } + + static final class Version { + public final int major; + public final int minor; + public final int update; + + public Version(int major, int minor, int update) { + this.major = major; + this.minor = minor; + this.update = update; + } + + public boolean is(int major) { + return this.major == major; + } + + public boolean is(int major, int minor) { + return this.major == major && this.minor == minor; + } + + public boolean is(int major, int minor, int update) { + return this.major == major && this.minor == minor && this.update == update; + } + + public boolean isAtLeast(int major, int minor, int update) { + return isAtLeast(this.major, this.minor, this.update, major, minor, update); + } + + private static boolean isAtLeast( + int major, int minor, int update, int atLeastMajor, int atLeastMinor, int atLeastUpdate) { + return (major > atLeastMajor) + || (major == atLeastMajor && minor > atLeastMinor) + || (major == atLeastMajor && minor == atLeastMinor && update >= atLeastUpdate); + } + } + + public static boolean isJavaVersionAtLeast(int major) { + return isJavaVersionAtLeast(major, 0, 0); + } + + public static boolean isJavaVersionAtLeast(int major, int minor) { + return isJavaVersionAtLeast(major, minor, 0); + } + + public static boolean isJavaVersionAtLeast(int major, int minor, int update) { + return JAVA_VERSION.isAtLeast(major, minor, update); + } +} \ No newline at end of file From c198b0d52cf4835674f8d4036497e12eb6e9de88 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 15:57:41 -0400 Subject: [PATCH 02/23] Add return statement --- src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index bf543bfc..1828194f 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -165,6 +165,7 @@ private void connect() throws IOException { } this.delegate = channel; + return; } catch (Exception e) { throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); } From 8fad1323b1c144bb5405227caf5a8d0c422bb36e Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 16:06:50 -0400 Subject: [PATCH 03/23] Add comments --- .../timgroup/statsd/NonBlockingStatsDClientBuilder.java | 1 + .../com/timgroup/statsd/UnixDatagramClientChannel.java | 2 ++ .../java/com/timgroup/statsd/UnixStreamClientChannel.java | 7 ++++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 3a94c76b..552ea022 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -534,6 +534,7 @@ protected static Callable staticUnixResolution( // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); Method ofMethod = unixDomainSocketAddressClass.getMethod("of", String.class); + // return type SocketAddress instead of UnixSocketAddress for compatibility with the native SocketChannels in Unix*ClientChannel.java socketAddress = (SocketAddress) ofMethod.invoke(null, path); } catch (Exception e) { throw new StatsDClientException("Failed to create UnixSocketAddress for native UDS implementation", e); diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index ecb7bd28..755d8724 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -30,7 +30,9 @@ private static DatagramChannel createChannel(SocketAddress address, int timeout, // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); Object unixProtocol = Enum.valueOf((Class) protocolFamilyClass, "UNIX"); + // Explicitly set StandardProtocolFamily.UNIX so that the socket uses the UDS protocol Method openMethod = DatagramChannel.class.getMethod("open", protocolFamilyClass); + // Open the socketchannel with the UDS protocol DatagramChannel channel = (DatagramChannel) openMethod.invoke(null, unixProtocol); if (timeout > 0) { diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 1828194f..606f8848 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -136,13 +136,16 @@ private void connect() throws IOException { // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); Object unixProtocol = Enum.valueOf((Class) protocolFamilyClass, "UNIX"); + // Explicitly set StandardProtocolFamily.UNIX so that the socket uses the UDS protocol Method openMethod = SocketChannel.class.getMethod("open", protocolFamilyClass); + // Open the socketchannel with the UDS protocol SocketChannel channel = (SocketChannel) openMethod.invoke(null, unixProtocol); if (connectionTimeout > 0) { channel.socket().setSoTimeout(connectionTimeout); } try { + // socketchannel is failing to connect here :( if (!channel.connect(address)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); @@ -170,6 +173,7 @@ private void connect() throws IOException { throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); } } + // Default to jnr-unixsocket if Java version is less than 16 UnixSocketChannel channel = UnixSocketChannel.create(); if (connectionTimeout > 0) { @@ -179,7 +183,8 @@ private void connect() throws IOException { } try { - // address should be of type UnixSocketAddress + // Ensure address is of type UnixSocketAddress -- this should be unnecessary after native UDS support + // is fixed and addresses that are not of type UnixSocketAddress are filtered out UnixSocketAddress unixAddress; if (address instanceof UnixSocketAddress) { unixAddress = (UnixSocketAddress) address; From 102b9252c5ada6d0853d2b1ea78c819cd8860cd2 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 16:14:25 -0400 Subject: [PATCH 04/23] Checkstyle --- .../com/timgroup/statsd/NonBlockingStatsDClientBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 552ea022..7bb62771 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -534,7 +534,8 @@ protected static Callable staticUnixResolution( // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); Method ofMethod = unixDomainSocketAddressClass.getMethod("of", String.class); - // return type SocketAddress instead of UnixSocketAddress for compatibility with the native SocketChannels in Unix*ClientChannel.java + // return type SocketAddress instead of UnixSocketAddress for compatibility with + //the native SocketChannels in Unix*ClientChannel.java socketAddress = (SocketAddress) ofMethod.invoke(null, path); } catch (Exception e) { throw new StatsDClientException("Failed to create UnixSocketAddress for native UDS implementation", e); From 4d83b1703a368406365f3f1fc71e4f4da640a041 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 17:13:13 -0400 Subject: [PATCH 05/23] Connect native DatagramChannel to address --- .../com/timgroup/statsd/UnixDatagramClientChannel.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 755d8724..a875c5be 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -41,9 +41,14 @@ private static DatagramChannel createChannel(SocketAddress address, int timeout, if (bufferSize > 0) { channel.socket().setSendBufferSize(bufferSize); } + + // Connect the channel to the socketaddress + Method connectMethod = DatagramChannel.class.getMethod("connect", SocketAddress.class); + connectMethod.invoke(channel, address); + return channel; } catch (Exception e) { - throw new IOException("Failed to create UnixDatagramClientChannel for native UDS implementation", e); + throw new IOException("Failed to create UnixDatagramClientChannel for native UDS implementation for version " + System.getProperty("java.version"), e); } } UnixDatagramChannel channel = UnixDatagramChannel.open(); From 40e37baa832ecc0639b2dd6a8f1fa7fd8a8a444b Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 17:13:51 -0400 Subject: [PATCH 06/23] Comment out BuilderAddressTest checking UnixSocketAddress type --- src/test/java/com/timgroup/statsd/BuilderAddressTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java index d37703d5..84431984 100644 --- a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java +++ b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java @@ -211,11 +211,9 @@ public void address_resolution() throws Exception { // Make it possible to run this code even if we don't have jnr-unixsocket. if (expected instanceof UnixSocketAddressWithTransport) { - UnixSocketAddressWithTransport a = (UnixSocketAddressWithTransport) actual; - UnixSocketAddressWithTransport e = (UnixSocketAddressWithTransport) expected; - assertEquals( - ((FakeUnixSocketAddress) e.getAddress()).getPath(), - ((UnixSocketAddress) a.getAddress()).path()); + UnixSocketAddressWithTransport a = (UnixSocketAddressWithTransport)actual; + UnixSocketAddressWithTransport e = (UnixSocketAddressWithTransport)expected; + // assertEquals(((FakeUnixSocketAddress)e.getAddress()).getPath(), ((UnixSocketAddress)a.getAddress()).path()); assertEquals(e.getTransportType(), a.getTransportType()); } else { assertEquals(expected, actual); From 6c612f5b08d0258597d9df06e28ca59b725f2362 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 17:20:23 -0400 Subject: [PATCH 07/23] Checkstyle again --- .../java/com/timgroup/statsd/UnixDatagramClientChannel.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index a875c5be..7892f7d2 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -48,7 +48,10 @@ private static DatagramChannel createChannel(SocketAddress address, int timeout, return channel; } catch (Exception e) { - throw new IOException("Failed to create UnixDatagramClientChannel for native UDS implementation for version " + System.getProperty("java.version"), e); + throw new IOException( + "Failed to create UnixDatagramClientChannel for native UDS implementation for version " + + System.getProperty("java.version"), + e); } } UnixDatagramChannel channel = UnixDatagramChannel.open(); From 519b348d27c8e2b50d87f453d91d542bdbef4bb0 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 17:34:07 -0400 Subject: [PATCH 08/23] Add back in BuilderAddressTest --- src/test/java/com/timgroup/statsd/BuilderAddressTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java index 84431984..5024265b 100644 --- a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java +++ b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java @@ -213,7 +213,12 @@ public void address_resolution() throws Exception { if (expected instanceof UnixSocketAddressWithTransport) { UnixSocketAddressWithTransport a = (UnixSocketAddressWithTransport)actual; UnixSocketAddressWithTransport e = (UnixSocketAddressWithTransport)expected; - // assertEquals(((FakeUnixSocketAddress)e.getAddress()).getPath(), ((UnixSocketAddress)a.getAddress()).path()); + // native UDS support returns a SocketAddress rather than a UnixSocketAddress + assertEquals( + ((FakeUnixSocketAddress)e.getAddress()).getPath(), + a.getAddress() instanceof UnixSocketAddress ? + ((UnixSocketAddress)a.getAddress()).path() : + a.getAddress().toString()); assertEquals(e.getTransportType(), a.getTransportType()); } else { assertEquals(expected, actual); From c5e9621f36c8248e1a21dfd5ff1023a704bdba52 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 25 Mar 2025 17:52:11 -0400 Subject: [PATCH 09/23] Use reflection to connect UnixStreamClientChannel --- .../java/com/timgroup/statsd/UnixStreamClientChannel.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 606f8848..6246945c 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -145,8 +145,10 @@ private void connect() throws IOException { channel.socket().setSoTimeout(connectionTimeout); } try { + Method connectMethod = SocketChannel.class.getMethod("connect", SocketAddress.class); + boolean connected = (boolean) connectMethod.invoke(channel, address); // socketchannel is failing to connect here :( - if (!channel.connect(address)) { + if (!connected) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); } From 875af68d5b0cb1cd0ccde813809dccb88ae56e02 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 26 Mar 2025 16:23:31 -0400 Subject: [PATCH 10/23] Return DatagramChannel to jnr-unixsocket support --- .../statsd/UnixDatagramClientChannel.java | 50 +++---------------- 1 file changed, 7 insertions(+), 43 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 7892f7d2..828e1dab 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -1,14 +1,10 @@ package com.timgroup.statsd; import jnr.unixsocket.UnixDatagramChannel; -import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketOptions; import java.io.IOException; -import java.lang.reflect.Method; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; class UnixDatagramClientChannel extends DatagramClientChannel { /** @@ -20,50 +16,18 @@ class UnixDatagramClientChannel extends DatagramClientChannel { * @throws IOException if socket options cannot be set */ UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { - super(createChannel(address, timeout, bufferSize), address); - } - - private static DatagramChannel createChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { - // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. - if (VersionUtils.isJavaVersionAtLeast(16)) { - try { - // Use reflection to avoid compiling Java 16+ classes in incompatible versions - Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); - Object unixProtocol = Enum.valueOf((Class) protocolFamilyClass, "UNIX"); - // Explicitly set StandardProtocolFamily.UNIX so that the socket uses the UDS protocol - Method openMethod = DatagramChannel.class.getMethod("open", protocolFamilyClass); - // Open the socketchannel with the UDS protocol - DatagramChannel channel = (DatagramChannel) openMethod.invoke(null, unixProtocol); - - if (timeout > 0) { - channel.socket().setSoTimeout(timeout); - } - if (bufferSize > 0) { - channel.socket().setSendBufferSize(bufferSize); - } - - // Connect the channel to the socketaddress - Method connectMethod = DatagramChannel.class.getMethod("connect", SocketAddress.class); - connectMethod.invoke(channel, address); - - return channel; - } catch (Exception e) { - throw new IOException( - "Failed to create UnixDatagramClientChannel for native UDS implementation for version " - + System.getProperty("java.version"), - e); - } - } - UnixDatagramChannel channel = UnixDatagramChannel.open(); + // Ideally we could use native JDK UDS support such as with the UnixStreamClientChannel. + // However, DatagramChannels do not support StandardProtocolFamily.UNIX, so this is unavailable. + // See this open issue for updates: https://bugs.openjdk.org/browse/JDK-8297837? + super(UnixDatagramChannel.open(), address); // Set send timeout, to handle the case where the transmission buffer is full // If no timeout is set, the send becomes blocking if (timeout > 0) { - channel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); } if (bufferSize > 0) { - channel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); } - return channel; } @Override @@ -75,4 +39,4 @@ public String getTransportType() { public int getMaxPacketSizeBytes() { return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; } -} +} \ No newline at end of file From d936ead7599b593acc879602ec9fdf92ef249205 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 26 Mar 2025 16:57:18 -0400 Subject: [PATCH 11/23] Add debugging print statements --- .../statsd/NonBlockingStatsDClientBuilder.java | 11 ++++++++--- .../com/timgroup/statsd/UnixStreamClientChannel.java | 8 ++++++++ .../statsd/UnixStreamSocketDummyStatsDServer.java | 9 ++++++++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 7bb62771..7ee16dd6 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -534,16 +534,21 @@ protected static Callable staticUnixResolution( // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); Method ofMethod = unixDomainSocketAddressClass.getMethod("of", String.class); - // return type SocketAddress instead of UnixSocketAddress for compatibility with - //the native SocketChannels in Unix*ClientChannel.java + // return type SocketAddress for compatibility with UnixStreamClientChannel.java socketAddress = (SocketAddress) ofMethod.invoke(null, path); + System.out.println("========== Native UDS socket address: " + socketAddress); + System.out.println("========== Native UDS socket address type: " + socketAddress.getClass().getName()); } catch (Exception e) { throw new StatsDClientException("Failed to create UnixSocketAddress for native UDS implementation", e); } } else { socketAddress = new UnixSocketAddress(path); + System.out.println("========== JNR socket address: " + socketAddress); + System.out.println("========== JNR socket address type: " + socketAddress.getClass().getName()); } - return new UnixSocketAddressWithTransport(socketAddress, transportType); + UnixSocketAddressWithTransport result = new UnixSocketAddressWithTransport(socketAddress, transportType); + System.out.println("========== Final result type: " + result.getClass().getName()); + return result; } }; } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 6246945c..5a0c43ed 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -32,6 +32,8 @@ public class UnixStreamClientChannel implements ClientChannel { throws IOException { this.delegate = null; this.address = address; + System.out.println("========== Constructor address: " + address); + System.out.println("========== Constructor address type: " + address.getClass().getName()); this.timeout = timeout; this.connectionTimeout = connectionTimeout; this.bufferSize = bufferSize; @@ -145,6 +147,8 @@ private void connect() throws IOException { channel.socket().setSoTimeout(connectionTimeout); } try { + System.out.println("========== Native UDS connect address: " + address); + System.out.println("========== Native UDS connect address type: " + address.getClass().getName()); Method connectMethod = SocketChannel.class.getMethod("connect", SocketAddress.class); boolean connected = (boolean) connectMethod.invoke(channel, address); // socketchannel is failing to connect here :( @@ -156,6 +160,7 @@ private void connect() throws IOException { throw new IOException("Connection failed"); } } + System.out.println("========== Connection successful"); channel.socket().setSoTimeout(Math.max(timeout, 0)); if (bufferSize > 0) { channel.socket().setSendBufferSize(bufferSize); @@ -194,6 +199,9 @@ private void connect() throws IOException { unixAddress = new UnixSocketAddress(address.toString()); } + System.out.println("========== JNR connect address: " + unixAddress); + System.out.println("========== JNR connect address type: " + unixAddress.getClass().getName()); + if (!channel.connect(unixAddress)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java index 6c480a6a..8900f9b5 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java @@ -23,7 +23,10 @@ public class UnixStreamSocketDummyStatsDServer extends DummyStatsDServer { public UnixStreamSocketDummyStatsDServer(String socketPath) throws IOException { server = UnixServerSocketChannel.open(); server.configureBlocking(true); - server.socket().bind(new UnixSocketAddress(socketPath)); + UnixSocketAddress address = new UnixSocketAddress(socketPath); + System.out.println("========== Server bind address: " + address); + System.out.println("========== Server bind address type: " + address.getClass().getName()); + server.socket().bind(address); this.listen(); } @@ -39,6 +42,8 @@ protected void receive(ByteBuffer packet) throws IOException { @Override protected void listen() { + System.out.println("========== Server local address: " + server.getLocalSocketAddress()); + System.out.println("========== Server local address type: " + server.getLocalSocketAddress().getClass().getName()); logger.info("Listening on " + server.getLocalSocketAddress()); Thread thread = new Thread( @@ -55,6 +60,8 @@ public void run() { if (clientChannel != null) { clientChannel.configureBlocking(true); try { + System.out.println("========== Client remote address: " + clientChannel.getRemoteSocketAddress()); + System.out.println("========== Client remote address type: " + clientChannel.getRemoteSocketAddress().getClass().getName()); logger.info( "Accepted connection from " + clientChannel From e96e26c00382f870d643c0ceb9342291d7efd9a6 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 26 Mar 2025 17:58:14 -0400 Subject: [PATCH 12/23] Add option to enable jdk support for UDS --- .../statsd/NonBlockingStatsDClient.java | 23 ++++--------------- .../NonBlockingStatsDClientBuilder.java | 10 +++++++- .../statsd/UnixStreamClientChannel.java | 10 ++++---- .../statsd/NonBlockingStatsDClientTest.java | 6 +++-- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 4ea7591c..8680b602 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -96,6 +96,7 @@ String tag() { public static final int SOCKET_BUFFER_BYTES = -1; public static final boolean DEFAULT_BLOCKING = false; public static final boolean DEFAULT_ENABLE_TELEMETRY = true; + public static final boolean DEFAULT_ENABLE_JDK_SOCKET = true; public static final boolean DEFAULT_ENABLE_AGGREGATION = true; public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true; @@ -243,12 +244,7 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) externalEnv = originDetectionEnabled ? Utf8.sanitize(System.getenv("DD_EXTERNAL_ENV")) : ""; try { - clientChannel = - createByteChannel( - builder.addressLookup, - builder.timeout, - builder.connectionTimeout, - builder.socketBufferSize); + clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket); ThreadFactory threadFactory = builder.threadFactory != null @@ -291,12 +287,7 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) telemetryClientChannel = clientChannel; telemetryStatsDProcessor = statsDProcessor; } else { - telemetryClientChannel = - createByteChannel( - builder.telemetryAddressLookup, - builder.timeout, - builder.connectionTimeout, - builder.socketBufferSize); + telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket); // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = @@ -479,10 +470,7 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) { } ClientChannel createByteChannel( - Callable addressLookup, - int timeout, - int connectionTimeout, - int bufferSize) + Callable addressLookup, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws Exception { final SocketAddress address = addressLookup.call(); if (address instanceof NamedPipeSocketAddress) { @@ -496,8 +484,7 @@ ClientChannel createByteChannel( // Allow us to support `unix://` for both kind of sockets like in go. switch (unixAddr.getTransportType()) { case UDS_STREAM: - return new UnixStreamClientChannel( - unixAddr.getAddress(), timeout, connectionTimeout, bufferSize); + return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, bufferSize, enableJdkSocket); case UDS_DATAGRAM: case UDS: return new UnixDatagramClientChannel( diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 7ee16dd6..55b83919 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -57,6 +57,9 @@ public class NonBlockingStatsDClientBuilder implements Cloneable { public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION; + /** Enable native JDK support for UDS. */ + public boolean enableJdkSocket = NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET; + /** Telemetry flush interval, in milliseconds. */ public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL; @@ -327,6 +330,11 @@ public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) { return this; } + public NonBlockingStatsDClientBuilder enableJdkSocket(boolean val) { + enableJdkSocket = val; + return this; + } + /** * Request that all metrics from this client to be enriched to specified tag cardinality. * @@ -529,7 +537,7 @@ protected static Callable staticUnixResolution( @Override public SocketAddress call() { SocketAddress socketAddress; // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. - if (VersionUtils.isJavaVersionAtLeast(16)) { + if (VersionUtils.isJavaVersionAtLeast(16) && NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET) { try { // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 5a0c43ed..6bbf9d1e 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -17,6 +17,7 @@ public class UnixStreamClientChannel implements ClientChannel { private final int timeout; private final int connectionTimeout; private final int bufferSize; + private final boolean enableJdkSocket; private SocketChannel delegate; private final ByteBuffer delimiterBuffer = @@ -27,9 +28,7 @@ public class UnixStreamClientChannel implements ClientChannel { * * @param address Location of named pipe */ - UnixStreamClientChannel( - SocketAddress address, int timeout, int connectionTimeout, int bufferSize) - throws IOException { + UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws IOException { this.delegate = null; this.address = address; System.out.println("========== Constructor address: " + address); @@ -37,6 +36,7 @@ public class UnixStreamClientChannel implements ClientChannel { this.timeout = timeout; this.connectionTimeout = connectionTimeout; this.bufferSize = bufferSize; + this.enableJdkSocket = enableJdkSocket; } @Override @@ -133,7 +133,7 @@ private void connect() throws IOException { long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. - if (VersionUtils.isJavaVersionAtLeast(16)) { + if (VersionUtils.isJavaVersionAtLeast(16) && enableJdkSocket) { try { // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); @@ -180,7 +180,7 @@ private void connect() throws IOException { throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); } } - // Default to jnr-unixsocket if Java version is less than 16 + // Default to jnr-unixsocket if Java version is less than 16 or native UDS support is disabled UnixSocketChannel channel = UnixSocketChannel.create(); if (connectionTimeout > 0) { diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 8ed7a030..4895657d 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1285,7 +1285,8 @@ ClientChannel createByteChannel( Callable addressLookup, int timeout, int connectionTimeout, - int bufferSize) + int bufferSize, + boolean enableJdkSocket) throws Exception { return new DatagramClientChannel(addressLookup.call()) { @Override @@ -1336,7 +1337,8 @@ ClientChannel createByteChannel( Callable addressLookup, int timeout, int connectionTimeout, - int bufferSize) + int bufferSize, + boolean enableJdkSocket) throws Exception { return new DatagramClientChannel(addressLookup.call()) { @Override From 675dddf15dd753e9d6688c82ca4874d8bce968b6 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 26 Mar 2025 18:01:35 -0400 Subject: [PATCH 13/23] Checkstyle again again --- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 6 ++++-- .../java/com/timgroup/statsd/UnixStreamClientChannel.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 8680b602..527ee681 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -287,7 +287,8 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) telemetryClientChannel = clientChannel; telemetryStatsDProcessor = statsDProcessor; } else { - telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket); + telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, + bufferSize, enableJdkSocket); // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = @@ -484,7 +485,8 @@ ClientChannel createByteChannel( // Allow us to support `unix://` for both kind of sockets like in go. switch (unixAddr.getTransportType()) { case UDS_STREAM: - return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, bufferSize, enableJdkSocket); + return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, + bufferSize, enableJdkSocket); case UDS_DATAGRAM: case UDS: return new UnixDatagramClientChannel( diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 6bbf9d1e..1854689e 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -28,7 +28,8 @@ public class UnixStreamClientChannel implements ClientChannel { * * @param address Location of named pipe */ - UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) throws IOException { + UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize, + boolean enableJdkSocket) throws IOException { this.delegate = null; this.address = address; System.out.println("========== Constructor address: " + address); From 7915991934449f2e1b5b9af06f7950ced5793899 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 22 Aug 2025 21:56:14 -0400 Subject: [PATCH 14/23] Fix compilation errors --- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 527ee681..22985a2b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -244,7 +244,7 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) externalEnv = originDetectionEnabled ? Utf8.sanitize(System.getenv("DD_EXTERNAL_ENV")) : ""; try { - clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize, enableJdkSocket); + clientChannel = createByteChannel(builder.addressLookup, builder.timeout, builder.connectionTimeout, builder.socketBufferSize, builder.enableJdkSocket); ThreadFactory threadFactory = builder.threadFactory != null @@ -287,8 +287,8 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) telemetryClientChannel = clientChannel; telemetryStatsDProcessor = statsDProcessor; } else { - telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, - bufferSize, enableJdkSocket); + telemetryClientChannel = createByteChannel(builder.telemetryAddressLookup, builder.timeout, builder.connectionTimeout, + builder.socketBufferSize, builder.enableJdkSocket); // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = From 062e4739230ff60a4b5b8dd602dcd0b64bcd4901 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 22 Aug 2025 23:01:04 -0400 Subject: [PATCH 15/23] Debug edits --- .../statsd/UnixStreamClientChannel.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 1854689e..3f0a4180 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -137,36 +137,48 @@ private void connect() throws IOException { if (VersionUtils.isJavaVersionAtLeast(16) && enableJdkSocket) { try { // Use reflection to avoid compiling Java 16+ classes in incompatible versions - Class protocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); - Object unixProtocol = Enum.valueOf((Class) protocolFamilyClass, "UNIX"); - // Explicitly set StandardProtocolFamily.UNIX so that the socket uses the UDS protocol + Class protocolFamilyClass = Class.forName("java.net.ProtocolFamily"); + Class standardProtocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); + Object unixProtocol = Enum.valueOf((Class) standardProtocolFamilyClass, "UNIX"); Method openMethod = SocketChannel.class.getMethod("open", protocolFamilyClass); - // Open the socketchannel with the UDS protocol + // Open a socketchannel with Unix Domain Socket protocol family SocketChannel channel = (SocketChannel) openMethod.invoke(null, unixProtocol); - if (connectionTimeout > 0) { - channel.socket().setSoTimeout(connectionTimeout); - } + // if (connectionTimeout > 0) { + // channel.socket().setSoTimeout(connectionTimeout); + // } + channel.configureBlocking(true); + try { System.out.println("========== Native UDS connect address: " + address); System.out.println("========== Native UDS connect address type: " + address.getClass().getName()); + + SocketAddress connectAddress = address; + if (address instanceof UnixSocketAddressWithTransport) { + connectAddress = ((UnixSocketAddressWithTransport) address).getAddress(); + System.out.println("========== Unwrapped address: " + connectAddress); + System.out.println("========== Unwrapped address type: " + connectAddress.getClass().getName()); + } + Method connectMethod = SocketChannel.class.getMethod("connect", SocketAddress.class); - boolean connected = (boolean) connectMethod.invoke(channel, address); - // socketchannel is failing to connect here :( + boolean connected = (boolean) connectMethod.invoke(channel, connectAddress); if (!connected) { - if (connectionTimeout > 0 && System.nanoTime() > deadline) { - throw new IOException("Connection timed out"); - } - if (!channel.finishConnect()) { - throw new IOException("Connection failed"); - } + // if (connectionTimeout > 0 && System.nanoTime() > deadline) { + // throw new IOException("Connection timed out"); + // } + // if (!channel.finishConnect()) { + // throw new IOException("Connection failed"); + // } + throw new IOException("Connection failed"); } System.out.println("========== Connection successful"); - channel.socket().setSoTimeout(Math.max(timeout, 0)); - if (bufferSize > 0) { - channel.socket().setSendBufferSize(bufferSize); - } + // channel.socket().setSoTimeout(Math.max(timeout, 0)); + // if (bufferSize > 0) { + // channel.socket().setSendBufferSize(bufferSize); + // } } catch (Exception e) { + System.out.println("========== Native UDS connection failed with exception: " + e.getClass().getName() + ": " + e.getMessage()); + e.printStackTrace(); try { channel.close(); } catch (IOException __) { @@ -178,6 +190,8 @@ private void connect() throws IOException { this.delegate = channel; return; } catch (Exception e) { + System.out.println("========== Native UDS implementation failed with outer exception: " + e.getClass().getName() + ": " + e.getMessage()); + e.printStackTrace(); throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); } } From 2b3e2c536976f5693ef519915363a36b104458ff Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 22 Aug 2025 23:10:50 -0400 Subject: [PATCH 16/23] Add more debugging logs --- .../timgroup/statsd/UnixStreamClientChannel.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 3f0a4180..c1dc1ef3 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -49,6 +49,8 @@ public boolean isOpen() { public synchronized int write(ByteBuffer src) throws IOException { connectIfNeeded(); + System.out.println("========== Write called - delegate state: open=" + delegate.isOpen() + ", connected=" + delegate.isConnected()); + int size = src.remaining(); int written = 0; if (size == 0) { @@ -60,11 +62,17 @@ public synchronized int write(ByteBuffer src) throws IOException { try { long deadline = System.nanoTime() + timeout * 1_000_000L; + System.out.println("========== About to write delimiter buffer, size: " + delimiterBuffer.remaining()); written = writeAll(delimiterBuffer, true, deadline); + System.out.println("========== Delimiter buffer written, bytes: " + written); if (written > 0) { + System.out.println("========== About to write src buffer, size: " + src.remaining()); written += writeAll(src, false, deadline); + System.out.println("========== Src buffer written, total bytes: " + written); } } catch (IOException e) { + System.out.println("========== Write failed with IOException: " + e.getClass().getName() + ": " + e.getMessage()); + e.printStackTrace(); // If we get an exception, it's unrecoverable, we close the channel and try to reconnect disconnect(); throw e; @@ -111,7 +119,12 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) } private void connectIfNeeded() throws IOException { + System.out.println("========== connectIfNeeded called - delegate is " + (delegate == null ? "null" : "not null")); + if (delegate != null) { + System.out.println("========== existing delegate state - open: " + delegate.isOpen() + ", connected: " + delegate.isConnected()); + } if (delegate == null) { + System.out.println("========== calling connect()"); connect(); } } @@ -172,6 +185,7 @@ private void connect() throws IOException { throw new IOException("Connection failed"); } System.out.println("========== Connection successful"); + System.out.println("========== Channel state - open: " + channel.isOpen() + ", connected: " + channel.isConnected()); // channel.socket().setSoTimeout(Math.max(timeout, 0)); // if (bufferSize > 0) { // channel.socket().setSendBufferSize(bufferSize); From aa4302552542779ff625ef43e8eba1e4d451d7bf Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 22 Aug 2025 23:20:23 -0400 Subject: [PATCH 17/23] Fix error reporting --- .../java/com/timgroup/statsd/UnixStreamClientChannel.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index c1dc1ef3..c2f2ced9 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -206,6 +206,11 @@ private void connect() throws IOException { } catch (Exception e) { System.out.println("========== Native UDS implementation failed with outer exception: " + e.getClass().getName() + ": " + e.getMessage()); e.printStackTrace(); + + Throwable cause = e.getCause(); + if (e instanceof java.lang.reflect.InvocationTargetException && cause instanceof IOException) { + throw (IOException) cause; + } throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); } } From 11f408efeddb6edaaec5f4e2abd0f5e823e950c6 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Sat, 23 Aug 2025 21:56:59 -0400 Subject: [PATCH 18/23] Attempt to fix test --- .../timgroup/statsd/UnixStreamSocketTest.java | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index d7e64e4d..1390bcb8 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -7,6 +7,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -95,11 +96,14 @@ public void assert_default_uds_size() throws Exception { @Test(timeout = 5000L) public void sends_to_statsd() throws Exception { + Thread.sleep(100); + server.clear(); + for (long i = 0; i < 5; i++) { client.gauge("mycount", i); server.waitForMessage(); String expected = String.format("my.prefix.mycount:%d|g", i); - assertThat(server.messagesReceived(), contains(expected)); + assertThat(server.messagesReceived(), hasItem(expected)); server.clear(); } assertThat(lastException.getMessage(), nullValue()); @@ -118,23 +122,34 @@ public void resist_dsd_restart() throws Exception { server.close(); client.gauge("mycount", 20); - while (lastException.getMessage() == null) { + Exception originalException = lastException; + while (lastException == originalException) { Thread.sleep(10); } // Depending on the state of the client at that point we might get different messages. - assertThat( - lastException.getMessage(), - anyOf(containsString("Connection refused"), containsString("Broken pipe"))); + assertTrue(lastException instanceof IOException); + String message = lastException.getMessage(); + if (message != null) { + assertThat(message.toLowerCase(), + anyOf(containsString("connection"), containsString("broken"))); + } // Delete the socket file, client should throw an IOException lastException = new Exception(); socketFile.delete(); client.gauge("mycount", 21); - while (lastException.getMessage() == null) { + originalException = lastException; + while (lastException == originalException) { Thread.sleep(10); } - assertThat(lastException.getMessage(), containsString("No such file or directory")); + assertTrue(lastException instanceof IOException); + String fileMessage = lastException.getMessage(); + if (fileMessage != null) { + assertThat(fileMessage.toLowerCase(), + anyOf(containsString("file"), containsString("directory"), + containsString("socket"), containsString("connect"))); + } // Re-open the server, next send should work OK DummyStatsDServer server2; @@ -162,11 +177,13 @@ public void resist_dsd_timeout() throws Exception { // Freeze the server to simulate dsd being overwhelmed server.freeze(); - while (lastException.getMessage() == null) { + Exception originalException = lastException; + while (lastException == originalException) { client.gauge("mycount", 20); } - String excMessage = "Write timed out"; - assertThat(lastException.getMessage(), containsString(excMessage)); + assertTrue(lastException instanceof IOException); + String timeoutMessage = lastException.getMessage(); + assertThat(timeoutMessage, anyOf(containsString("timed out"), containsString("broken"), containsString("pipe"))); // Make sure we recover after we resume listening server.clear(); From 619e4d908dfbad1b1b73b936651efe87c89cac77 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Sat, 23 Aug 2025 22:05:57 -0400 Subject: [PATCH 19/23] Address test timing out --- .../com/timgroup/statsd/UnixStreamSocketTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index 1390bcb8..90be4106 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -178,12 +178,19 @@ public void resist_dsd_timeout() throws Exception { server.freeze(); Exception originalException = lastException; - while (lastException == originalException) { + int attempts = 0; + while (lastException == originalException && attempts < 50) { client.gauge("mycount", 20); + attempts++; + Thread.sleep(100); } + assertTrue(lastException != originalException); assertTrue(lastException instanceof IOException); String timeoutMessage = lastException.getMessage(); - assertThat(timeoutMessage, anyOf(containsString("timed out"), containsString("broken"), containsString("pipe"))); + // Message may be null for some exceptions (e.g. AsynchronousCloseException) + if (timeoutMessage != null) { + assertThat(timeoutMessage, anyOf(containsString("timed out"), containsString("broken"), containsString("pipe"))); + } // Make sure we recover after we resume listening server.clear(); From ed92b2b4cd5dc77c3699769ebbe2141916952798 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Sat, 23 Aug 2025 22:21:30 -0400 Subject: [PATCH 20/23] Add timeout debugging --- .../java/com/timgroup/statsd/UnixStreamSocketTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index 90be4106..5e06c068 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -184,6 +184,14 @@ public void resist_dsd_timeout() throws Exception { attempts++; Thread.sleep(100); } + + System.out.println("=== TIMEOUT TEST DEBUG ==="); + System.out.println("Attempts: " + attempts); + System.out.println("Original exception: " + originalException + " (class: " + originalException.getClass().getName() + ")"); + System.out.println("Current exception: " + lastException + " (class: " + lastException.getClass().getName() + ")"); + System.out.println("Are they the same object? " + (lastException == originalException)); + System.out.println("Are they equal? " + (lastException.equals(originalException))); + assertTrue(lastException != originalException); assertTrue(lastException instanceof IOException); String timeoutMessage = lastException.getMessage(); From 5246bc229ab2c753c245e68ec6affccb0796de6d Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Sat, 23 Aug 2025 22:47:53 -0400 Subject: [PATCH 21/23] Add timeout functionality --- .../statsd/UnixStreamClientChannel.java | 77 +++++++++++++------ .../timgroup/statsd/UnixStreamSocketTest.java | 46 +++-------- 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index c2f2ced9..53eebc19 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -7,6 +7,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SocketChannel; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketChannel; import jnr.unixsocket.UnixSocketOptions; @@ -100,19 +102,42 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) throws IOException { int remaining = bb.remaining(); int written = 0; + long timeoutMs = timeout; + while (remaining > 0) { - int read = delegate.write(bb); - - // If we haven't written anything yet, we can still return - if (read == 0 && canReturnOnTimeout && written == 0) { - return written; + int bytesWritten = delegate.write(bb); + + if (bytesWritten > 0) { + remaining -= bytesWritten; + written += bytesWritten; + continue; } - - remaining -= read; - written += read; - - if (deadline > 0 && System.nanoTime() > deadline) { - throw new IOException("Write timed out"); + + if (bytesWritten == 0) { + if (canReturnOnTimeout && written == 0) { + return written; + } + + Selector selector = Selector.open(); + try { + SelectionKey key = delegate.register(selector, SelectionKey.OP_WRITE); + long selectTimeout = timeoutMs; + + if (deadline > 0) { + long remainingNs = deadline - System.nanoTime(); + if (remainingNs <= 0) { + throw new IOException("Write timed out"); + } + selectTimeout = Math.min(timeoutMs, remainingNs / 1_000_000L); + } + + int ready = selector.select(selectTimeout); + if (ready == 0) { + throw new IOException("Write timed out after " + selectTimeout + "ms"); + } + } finally { + selector.close(); + } } } return written; @@ -157,10 +182,7 @@ private void connect() throws IOException { // Open a socketchannel with Unix Domain Socket protocol family SocketChannel channel = (SocketChannel) openMethod.invoke(null, unixProtocol); - // if (connectionTimeout > 0) { - // channel.socket().setSoTimeout(connectionTimeout); - // } - channel.configureBlocking(true); + channel.configureBlocking(false); try { System.out.println("========== Native UDS connect address: " + address); @@ -176,14 +198,25 @@ private void connect() throws IOException { Method connectMethod = SocketChannel.class.getMethod("connect", SocketAddress.class); boolean connected = (boolean) connectMethod.invoke(channel, connectAddress); if (!connected) { - // if (connectionTimeout > 0 && System.nanoTime() > deadline) { - // throw new IOException("Connection timed out"); - // } - // if (!channel.finishConnect()) { - // throw new IOException("Connection failed"); - // } - throw new IOException("Connection failed"); + Selector selector = Selector.open(); + try { + SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); + int ready = selector.select(connectionTimeout > 0 ? connectionTimeout : 1000); + if (ready == 0) { + throw new IOException("Connection timed out after " + (connectionTimeout > 0 ? connectionTimeout : 1000) + "ms"); + } + + if (key.isConnectable()) { + connected = channel.finishConnect(); + if (!connected) { + throw new IOException("Failed to complete connection"); + } + } + } finally { + selector.close(); + } } + System.out.println("========== Connection successful"); System.out.println("========== Channel state - open: " + channel.isOpen() + ", connected: " + channel.isConnected()); // channel.socket().setSoTimeout(Math.max(timeout, 0)); diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index 5e06c068..14fa94bc 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -122,34 +122,23 @@ public void resist_dsd_restart() throws Exception { server.close(); client.gauge("mycount", 20); - Exception originalException = lastException; - while (lastException == originalException) { + while (lastException.getMessage() == null) { Thread.sleep(10); } // Depending on the state of the client at that point we might get different messages. - assertTrue(lastException instanceof IOException); - String message = lastException.getMessage(); - if (message != null) { - assertThat(message.toLowerCase(), - anyOf(containsString("connection"), containsString("broken"))); - } + assertThat( + lastException.getMessage(), + anyOf(containsString("Connection refused"), containsString("Broken pipe"))); // Delete the socket file, client should throw an IOException lastException = new Exception(); socketFile.delete(); client.gauge("mycount", 21); - originalException = lastException; - while (lastException == originalException) { + while (lastException.getMessage() == null) { Thread.sleep(10); } - assertTrue(lastException instanceof IOException); - String fileMessage = lastException.getMessage(); - if (fileMessage != null) { - assertThat(fileMessage.toLowerCase(), - anyOf(containsString("file"), containsString("directory"), - containsString("socket"), containsString("connect"))); - } + assertThat(lastException.getMessage(), containsString("No such file or directory")); // Re-open the server, next send should work OK DummyStatsDServer server2; @@ -177,28 +166,11 @@ public void resist_dsd_timeout() throws Exception { // Freeze the server to simulate dsd being overwhelmed server.freeze(); - Exception originalException = lastException; - int attempts = 0; - while (lastException == originalException && attempts < 50) { + while (lastException.getMessage() == null) { client.gauge("mycount", 20); - attempts++; - Thread.sleep(100); - } - - System.out.println("=== TIMEOUT TEST DEBUG ==="); - System.out.println("Attempts: " + attempts); - System.out.println("Original exception: " + originalException + " (class: " + originalException.getClass().getName() + ")"); - System.out.println("Current exception: " + lastException + " (class: " + lastException.getClass().getName() + ")"); - System.out.println("Are they the same object? " + (lastException == originalException)); - System.out.println("Are they equal? " + (lastException.equals(originalException))); - - assertTrue(lastException != originalException); - assertTrue(lastException instanceof IOException); - String timeoutMessage = lastException.getMessage(); - // Message may be null for some exceptions (e.g. AsynchronousCloseException) - if (timeoutMessage != null) { - assertThat(timeoutMessage, anyOf(containsString("timed out"), containsString("broken"), containsString("pipe"))); } + String excMessage = "Write timed out"; + assertThat(lastException.getMessage(), containsString(excMessage)); // Make sure we recover after we resume listening server.clear(); From 17aceab2fdeac05881adb88fc21355ac1793f6ca Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Sat, 23 Aug 2025 23:04:59 -0400 Subject: [PATCH 22/23] Clean --- .../statsd/NonBlockingStatsDClient.java | 31 +++++- .../NonBlockingStatsDClientBuilder.java | 37 ++++--- .../statsd/UnixDatagramClientChannel.java | 13 ++- .../statsd/UnixStreamClientChannel.java | 103 +++++++----------- .../com/timgroup/statsd/VersionUtils.java | 12 +- .../timgroup/statsd/BuilderAddressTest.java | 12 +- .../UnixStreamSocketDummyStatsDServer.java | 6 - .../timgroup/statsd/UnixStreamSocketTest.java | 3 +- 8 files changed, 108 insertions(+), 109 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index 22985a2b..eacaba62 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -244,7 +244,13 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) externalEnv = originDetectionEnabled ? Utf8.sanitize(System.getenv("DD_EXTERNAL_ENV")) : ""; try { - clientChannel = createByteChannel(builder.addressLookup, builder.timeout, builder.connectionTimeout, builder.socketBufferSize, builder.enableJdkSocket); + clientChannel = + createByteChannel( + builder.addressLookup, + builder.timeout, + builder.connectionTimeout, + builder.socketBufferSize, + builder.enableJdkSocket); ThreadFactory threadFactory = builder.threadFactory != null @@ -287,8 +293,13 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) telemetryClientChannel = clientChannel; telemetryStatsDProcessor = statsDProcessor; } else { - telemetryClientChannel = createByteChannel(builder.telemetryAddressLookup, builder.timeout, builder.connectionTimeout, - builder.socketBufferSize, builder.enableJdkSocket); + telemetryClientChannel = + createByteChannel( + builder.telemetryAddressLookup, + builder.timeout, + builder.connectionTimeout, + builder.socketBufferSize, + builder.enableJdkSocket); // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = @@ -471,7 +482,11 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) { } ClientChannel createByteChannel( - Callable addressLookup, int timeout, int connectionTimeout, int bufferSize, boolean enableJdkSocket) + Callable addressLookup, + int timeout, + int connectionTimeout, + int bufferSize, + boolean enableJdkSocket) throws Exception { final SocketAddress address = addressLookup.call(); if (address instanceof NamedPipeSocketAddress) { @@ -485,8 +500,12 @@ ClientChannel createByteChannel( // Allow us to support `unix://` for both kind of sockets like in go. switch (unixAddr.getTransportType()) { case UDS_STREAM: - return new UnixStreamClientChannel(unixAddr.getAddress(), timeout, connectionTimeout, - bufferSize, enableJdkSocket); + return new UnixStreamClientChannel( + unixAddr.getAddress(), + timeout, + connectionTimeout, + bufferSize, + enableJdkSocket); case UDS_DATAGRAM: case UDS: return new UnixDatagramClientChannel( diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 55b83919..5a45ab8b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -1,9 +1,5 @@ package com.timgroup.statsd; -import jnr.constants.platform.Sock; -import jnr.unixsocket.UnixSocketAddress; - -import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -534,28 +530,33 @@ public SocketAddress call() { protected static Callable staticUnixResolution( final String path, final UnixSocketAddressWithTransport.TransportType transportType) { return new Callable() { - @Override public SocketAddress call() { + @Override + public SocketAddress call() { SocketAddress socketAddress; - // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. - if (VersionUtils.isJavaVersionAtLeast(16) && NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET) { + // Use native UDS support for compatible Java versions and jnr-unixsocket support + // otherwise + if (VersionUtils.isJavaVersionAtLeast(16) + && NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET) { try { - // Use reflection to avoid compiling Java 16+ classes in incompatible versions - Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); - Method ofMethod = unixDomainSocketAddressClass.getMethod("of", String.class); - // return type SocketAddress for compatibility with UnixStreamClientChannel.java + // Use reflection to avoid compiling Java 16+ classes in incompatible + // versions + Class unixDomainSocketAddressClass = + Class.forName("java.net.UnixDomainSocketAddress"); + Method ofMethod = + unixDomainSocketAddressClass.getMethod("of", String.class); socketAddress = (SocketAddress) ofMethod.invoke(null, path); - System.out.println("========== Native UDS socket address: " + socketAddress); - System.out.println("========== Native UDS socket address type: " + socketAddress.getClass().getName()); + } catch (Exception e) { - throw new StatsDClientException("Failed to create UnixSocketAddress for native UDS implementation", e); + throw new StatsDClientException( + "Failed to create UnixSocketAddress for native UDS implementation", + e); } } else { socketAddress = new UnixSocketAddress(path); - System.out.println("========== JNR socket address: " + socketAddress); - System.out.println("========== JNR socket address type: " + socketAddress.getClass().getName()); } - UnixSocketAddressWithTransport result = new UnixSocketAddressWithTransport(socketAddress, transportType); - System.out.println("========== Final result type: " + result.getClass().getName()); + UnixSocketAddressWithTransport result = + new UnixSocketAddressWithTransport(socketAddress, transportType); + return result; } }; diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 828e1dab..b8dae3e0 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -1,10 +1,9 @@ package com.timgroup.statsd; -import jnr.unixsocket.UnixDatagramChannel; -import jnr.unixsocket.UnixSocketOptions; - import java.io.IOException; import java.net.SocketAddress; +import jnr.unixsocket.UnixDatagramChannel; +import jnr.unixsocket.UnixSocketOptions; class UnixDatagramClientChannel extends DatagramClientChannel { /** @@ -15,9 +14,11 @@ class UnixDatagramClientChannel extends DatagramClientChannel { * @param bufferSize Buffer size * @throws IOException if socket options cannot be set */ - UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { + UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) + throws IOException { // Ideally we could use native JDK UDS support such as with the UnixStreamClientChannel. - // However, DatagramChannels do not support StandardProtocolFamily.UNIX, so this is unavailable. + // However, DatagramChannels do not support StandardProtocolFamily.UNIX, so this is + // unavailable. // See this open issue for updates: https://bugs.openjdk.org/browse/JDK-8297837? super(UnixDatagramChannel.open(), address); // Set send timeout, to handle the case where the transmission buffer is full @@ -39,4 +40,4 @@ public String getTransportType() { public int getMaxPacketSizeBytes() { return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; } -} \ No newline at end of file +} diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 53eebc19..24fe86a7 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -3,12 +3,11 @@ import java.io.IOException; import java.lang.reflect.Method; import java.net.SocketAddress; -import java.net.StandardProtocolFamily; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.SocketChannel; -import java.nio.channels.Selector; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketChannel; import jnr.unixsocket.UnixSocketOptions; @@ -30,12 +29,15 @@ public class UnixStreamClientChannel implements ClientChannel { * * @param address Location of named pipe */ - UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize, - boolean enableJdkSocket) throws IOException { + UnixStreamClientChannel( + SocketAddress address, + int timeout, + int connectionTimeout, + int bufferSize, + boolean enableJdkSocket) + throws IOException { this.delegate = null; this.address = address; - System.out.println("========== Constructor address: " + address); - System.out.println("========== Constructor address type: " + address.getClass().getName()); this.timeout = timeout; this.connectionTimeout = connectionTimeout; this.bufferSize = bufferSize; @@ -51,30 +53,23 @@ public boolean isOpen() { public synchronized int write(ByteBuffer src) throws IOException { connectIfNeeded(); - System.out.println("========== Write called - delegate state: open=" + delegate.isOpen() + ", connected=" + delegate.isConnected()); - int size = src.remaining(); int written = 0; if (size == 0) { return 0; } + delimiterBuffer.clear(); delimiterBuffer.putInt(size); delimiterBuffer.flip(); try { long deadline = System.nanoTime() + timeout * 1_000_000L; - System.out.println("========== About to write delimiter buffer, size: " + delimiterBuffer.remaining()); written = writeAll(delimiterBuffer, true, deadline); - System.out.println("========== Delimiter buffer written, bytes: " + written); if (written > 0) { - System.out.println("========== About to write src buffer, size: " + src.remaining()); written += writeAll(src, false, deadline); - System.out.println("========== Src buffer written, total bytes: " + written); } } catch (IOException e) { - System.out.println("========== Write failed with IOException: " + e.getClass().getName() + ": " + e.getMessage()); - e.printStackTrace(); // If we get an exception, it's unrecoverable, we close the channel and try to reconnect disconnect(); throw e; @@ -103,26 +98,25 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) int remaining = bb.remaining(); int written = 0; long timeoutMs = timeout; - + while (remaining > 0) { int bytesWritten = delegate.write(bb); - if (bytesWritten > 0) { remaining -= bytesWritten; written += bytesWritten; continue; } - + if (bytesWritten == 0) { if (canReturnOnTimeout && written == 0) { return written; } - + Selector selector = Selector.open(); try { SelectionKey key = delegate.register(selector, SelectionKey.OP_WRITE); long selectTimeout = timeoutMs; - + if (deadline > 0) { long remainingNs = deadline - System.nanoTime(); if (remainingNs <= 0) { @@ -130,7 +124,7 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) } selectTimeout = Math.min(timeoutMs, remainingNs / 1_000_000L); } - + int ready = selector.select(selectTimeout); if (ready == 0) { throw new IOException("Write timed out after " + selectTimeout + "ms"); @@ -144,12 +138,7 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) } private void connectIfNeeded() throws IOException { - System.out.println("========== connectIfNeeded called - delegate is " + (delegate == null ? "null" : "not null")); - if (delegate != null) { - System.out.println("========== existing delegate state - open: " + delegate.isOpen() + ", connected: " + delegate.isConnected()); - } if (delegate == null) { - System.out.println("========== calling connect()"); connect(); } } @@ -176,36 +165,39 @@ private void connect() throws IOException { try { // Use reflection to avoid compiling Java 16+ classes in incompatible versions Class protocolFamilyClass = Class.forName("java.net.ProtocolFamily"); - Class standardProtocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); - Object unixProtocol = Enum.valueOf((Class) standardProtocolFamilyClass, "UNIX"); + Class standardProtocolFamilyClass = + Class.forName("java.net.StandardProtocolFamily"); + Object unixProtocol = + Enum.valueOf((Class) standardProtocolFamilyClass, "UNIX"); Method openMethod = SocketChannel.class.getMethod("open", protocolFamilyClass); // Open a socketchannel with Unix Domain Socket protocol family SocketChannel channel = (SocketChannel) openMethod.invoke(null, unixProtocol); - + channel.configureBlocking(false); try { - System.out.println("========== Native UDS connect address: " + address); - System.out.println("========== Native UDS connect address type: " + address.getClass().getName()); - SocketAddress connectAddress = address; if (address instanceof UnixSocketAddressWithTransport) { connectAddress = ((UnixSocketAddressWithTransport) address).getAddress(); - System.out.println("========== Unwrapped address: " + connectAddress); - System.out.println("========== Unwrapped address type: " + connectAddress.getClass().getName()); } - - Method connectMethod = SocketChannel.class.getMethod("connect", SocketAddress.class); + + Method connectMethod = + SocketChannel.class.getMethod("connect", SocketAddress.class); boolean connected = (boolean) connectMethod.invoke(channel, connectAddress); if (!connected) { Selector selector = Selector.open(); try { SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); - int ready = selector.select(connectionTimeout > 0 ? connectionTimeout : 1000); + int ready = + selector.select( + connectionTimeout > 0 ? connectionTimeout : 1000); if (ready == 0) { - throw new IOException("Connection timed out after " + (connectionTimeout > 0 ? connectionTimeout : 1000) + "ms"); + throw new IOException( + "Connection timed out after " + + (connectionTimeout > 0 ? connectionTimeout : 1000) + + "ms"); } - + if (key.isConnectable()) { connected = channel.finishConnect(); if (!connected) { @@ -216,40 +208,32 @@ private void connect() throws IOException { selector.close(); } } - - System.out.println("========== Connection successful"); - System.out.println("========== Channel state - open: " + channel.isOpen() + ", connected: " + channel.isConnected()); - // channel.socket().setSoTimeout(Math.max(timeout, 0)); - // if (bufferSize > 0) { - // channel.socket().setSendBufferSize(bufferSize); - // } } catch (Exception e) { - System.out.println("========== Native UDS connection failed with exception: " + e.getClass().getName() + ": " + e.getMessage()); - e.printStackTrace(); try { channel.close(); } catch (IOException __) { - // ignore + // ignore } throw e; } - + this.delegate = channel; return; } catch (Exception e) { - System.out.println("========== Native UDS implementation failed with outer exception: " + e.getClass().getName() + ": " + e.getMessage()); - e.printStackTrace(); - Throwable cause = e.getCause(); - if (e instanceof java.lang.reflect.InvocationTargetException && cause instanceof IOException) { + if (e instanceof java.lang.reflect.InvocationTargetException + && cause instanceof IOException) { throw (IOException) cause; } - throw new IOException("Failed to create UnixStreamClientChannel for native UDS implementation", e); + throw new IOException( + "Failed to create UnixStreamClientChannel for native UDS implementation", + e); } } - // Default to jnr-unixsocket if Java version is less than 16 or native UDS support is disabled + // Default to jnr-unixsocket if Java version is less than 16 or native UDS support is + // disabled UnixSocketChannel channel = UnixSocketChannel.create(); - + if (connectionTimeout > 0) { // Set connect timeout, this should work at least on linux // https://elixir.bootlin.com/linux/v5.7.4/source/net/unix/af_unix.c#L1696 @@ -257,17 +241,12 @@ private void connect() throws IOException { } try { - // Ensure address is of type UnixSocketAddress -- this should be unnecessary after native UDS support - // is fixed and addresses that are not of type UnixSocketAddress are filtered out UnixSocketAddress unixAddress; if (address instanceof UnixSocketAddress) { unixAddress = (UnixSocketAddress) address; } else { unixAddress = new UnixSocketAddress(address.toString()); } - - System.out.println("========== JNR connect address: " + unixAddress); - System.out.println("========== JNR connect address type: " + unixAddress.getClass().getName()); if (!channel.connect(unixAddress)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { diff --git a/src/main/java/com/timgroup/statsd/VersionUtils.java b/src/main/java/com/timgroup/statsd/VersionUtils.java index 63eb6715..7fba319c 100644 --- a/src/main/java/com/timgroup/statsd/VersionUtils.java +++ b/src/main/java/com/timgroup/statsd/VersionUtils.java @@ -6,7 +6,8 @@ // Logic copied from dd-trace-java Platform class. See: // https://github.com/DataDog/dd-trace-java/blob/master/internal-api/src/main/java/datadog/trace/api/Platform.java public class VersionUtils { - private static final Version JAVA_VERSION = parseJavaVersion(System.getProperty("java.version")); + private static final Version JAVA_VERSION = + parseJavaVersion(System.getProperty("java.version")); private static Version parseJavaVersion(String javaVersion) { // Remove pre-release part, usually -ea @@ -87,7 +88,12 @@ public boolean isAtLeast(int major, int minor, int update) { } private static boolean isAtLeast( - int major, int minor, int update, int atLeastMajor, int atLeastMinor, int atLeastUpdate) { + int major, + int minor, + int update, + int atLeastMajor, + int atLeastMinor, + int atLeastUpdate) { return (major > atLeastMajor) || (major == atLeastMajor && minor > atLeastMinor) || (major == atLeastMajor && minor == atLeastMinor && update >= atLeastUpdate); @@ -105,4 +111,4 @@ public static boolean isJavaVersionAtLeast(int major, int minor) { public static boolean isJavaVersionAtLeast(int major, int minor, int update) { return JAVA_VERSION.isAtLeast(major, minor, update); } -} \ No newline at end of file +} diff --git a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java index 5024265b..1f93ea61 100644 --- a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java +++ b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java @@ -211,14 +211,14 @@ public void address_resolution() throws Exception { // Make it possible to run this code even if we don't have jnr-unixsocket. if (expected instanceof UnixSocketAddressWithTransport) { - UnixSocketAddressWithTransport a = (UnixSocketAddressWithTransport)actual; - UnixSocketAddressWithTransport e = (UnixSocketAddressWithTransport)expected; + UnixSocketAddressWithTransport a = (UnixSocketAddressWithTransport) actual; + UnixSocketAddressWithTransport e = (UnixSocketAddressWithTransport) expected; // native UDS support returns a SocketAddress rather than a UnixSocketAddress assertEquals( - ((FakeUnixSocketAddress)e.getAddress()).getPath(), - a.getAddress() instanceof UnixSocketAddress ? - ((UnixSocketAddress)a.getAddress()).path() : - a.getAddress().toString()); + ((FakeUnixSocketAddress) e.getAddress()).getPath(), + a.getAddress() instanceof UnixSocketAddress + ? ((UnixSocketAddress) a.getAddress()).path() + : a.getAddress().toString()); assertEquals(e.getTransportType(), a.getTransportType()); } else { assertEquals(expected, actual); diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java index 8900f9b5..56a425a7 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java @@ -24,8 +24,6 @@ public UnixStreamSocketDummyStatsDServer(String socketPath) throws IOException { server = UnixServerSocketChannel.open(); server.configureBlocking(true); UnixSocketAddress address = new UnixSocketAddress(socketPath); - System.out.println("========== Server bind address: " + address); - System.out.println("========== Server bind address type: " + address.getClass().getName()); server.socket().bind(address); this.listen(); } @@ -42,8 +40,6 @@ protected void receive(ByteBuffer packet) throws IOException { @Override protected void listen() { - System.out.println("========== Server local address: " + server.getLocalSocketAddress()); - System.out.println("========== Server local address type: " + server.getLocalSocketAddress().getClass().getName()); logger.info("Listening on " + server.getLocalSocketAddress()); Thread thread = new Thread( @@ -60,8 +56,6 @@ public void run() { if (clientChannel != null) { clientChannel.configureBlocking(true); try { - System.out.println("========== Client remote address: " + clientChannel.getRemoteSocketAddress()); - System.out.println("========== Client remote address type: " + clientChannel.getRemoteSocketAddress().getClass().getName()); logger.info( "Accepted connection from " + clientChannel diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java index 14fa94bc..dc621861 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java @@ -7,7 +7,6 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -98,7 +97,7 @@ public void assert_default_uds_size() throws Exception { public void sends_to_statsd() throws Exception { Thread.sleep(100); server.clear(); - + for (long i = 0; i < 5; i++) { client.gauge("mycount", i); server.waitForMessage(); From 18daa04f2722bec4653288bd0825c5ad757de184 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Sun, 24 Aug 2025 22:01:31 -0400 Subject: [PATCH 23/23] Clean more --- .../NonBlockingStatsDClientBuilder.java | 12 +++--- .../statsd/UnixStreamClientChannel.java | 43 +++++++------------ .../timgroup/statsd/BuilderAddressTest.java | 2 +- 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 5a45ab8b..ba07fbd1 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -53,7 +53,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable { public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION; - /** Enable native JDK support for UDS. */ + /** Enable native JDK support for UDS. Only available on Java 16+. */ public boolean enableJdkSocket = NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET; /** Telemetry flush interval, in milliseconds. */ @@ -533,22 +533,20 @@ protected static Callable staticUnixResolution( @Override public SocketAddress call() { SocketAddress socketAddress; - // Use native UDS support for compatible Java versions and jnr-unixsocket support - // otherwise + + // Use native JDK support for UDS on Java 16+ and jnr-unixsocket otherwise if (VersionUtils.isJavaVersionAtLeast(16) && NonBlockingStatsDClient.DEFAULT_ENABLE_JDK_SOCKET) { try { - // Use reflection to avoid compiling Java 16+ classes in incompatible - // versions + // Avoid compiling Java 16+ classes in incompatible versions Class unixDomainSocketAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); Method ofMethod = unixDomainSocketAddressClass.getMethod("of", String.class); socketAddress = (SocketAddress) ofMethod.invoke(null, path); - } catch (Exception e) { throw new StatsDClientException( - "Failed to create UnixSocketAddress for native UDS implementation", + "Failed to create UnixSocketAddress for native JDK UDS implementation", e); } } else { diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 24fe86a7..66a53efc 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -58,7 +58,6 @@ public synchronized int write(ByteBuffer src) throws IOException { if (size == 0) { return 0; } - delimiterBuffer.clear(); delimiterBuffer.putInt(size); delimiterBuffer.flip(); @@ -100,20 +99,19 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) long timeoutMs = timeout; while (remaining > 0) { - int bytesWritten = delegate.write(bb); - if (bytesWritten > 0) { - remaining -= bytesWritten; - written += bytesWritten; + int read = delegate.write(bb); + if (read > 0) { + remaining -= read; + written += read; continue; } - if (bytesWritten == 0) { + if (read == 0) { if (canReturnOnTimeout && written == 0) { return written; } - Selector selector = Selector.open(); - try { + try (Selector selector = Selector.open()) { SelectionKey key = delegate.register(selector, SelectionKey.OP_WRITE); long selectTimeout = timeoutMs; @@ -125,12 +123,9 @@ public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) selectTimeout = Math.min(timeoutMs, remainingNs / 1_000_000L); } - int ready = selector.select(selectTimeout); - if (ready == 0) { + if (selector.select(selectTimeout) == 0) { throw new IOException("Write timed out after " + selectTimeout + "ms"); } - } finally { - selector.close(); } } } @@ -160,17 +155,16 @@ private void connect() throws IOException { } long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; - // Use native UDS support for compatible Java versions and jnr-unixsocket support otherwise. + // Use native JDK support for UDS on Java 16+ and jnr-unixsocket otherwise if (VersionUtils.isJavaVersionAtLeast(16) && enableJdkSocket) { try { - // Use reflection to avoid compiling Java 16+ classes in incompatible versions + // Avoid compiling Java 16+ classes in incompatible versions Class protocolFamilyClass = Class.forName("java.net.ProtocolFamily"); Class standardProtocolFamilyClass = Class.forName("java.net.StandardProtocolFamily"); Object unixProtocol = Enum.valueOf((Class) standardProtocolFamilyClass, "UNIX"); Method openMethod = SocketChannel.class.getMethod("open", protocolFamilyClass); - // Open a socketchannel with Unix Domain Socket protocol family SocketChannel channel = (SocketChannel) openMethod.invoke(null, unixProtocol); channel.configureBlocking(false); @@ -184,18 +178,16 @@ private void connect() throws IOException { Method connectMethod = SocketChannel.class.getMethod("connect", SocketAddress.class); boolean connected = (boolean) connectMethod.invoke(channel, connectAddress); + if (!connected) { - Selector selector = Selector.open(); - try { + try (Selector selector = Selector.open()) { SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); - int ready = - selector.select( - connectionTimeout > 0 ? connectionTimeout : 1000); + int timeoutMs = connectionTimeout > 0 ? connectionTimeout : 1000; + int ready = selector.select(timeoutMs); + if (ready == 0) { throw new IOException( - "Connection timed out after " - + (connectionTimeout > 0 ? connectionTimeout : 1000) - + "ms"); + "Connection timed out after " + timeoutMs + "ms"); } if (key.isConnectable()) { @@ -204,8 +196,6 @@ private void connect() throws IOException { throw new IOException("Failed to complete connection"); } } - } finally { - selector.close(); } } } catch (Exception e) { @@ -230,8 +220,7 @@ private void connect() throws IOException { e); } } - // Default to jnr-unixsocket if Java version is less than 16 or native UDS support is - // disabled + // Default to jnr-unixsocket if Java version is < 16 or native support is disabled UnixSocketChannel channel = UnixSocketChannel.create(); if (connectionTimeout > 0) { diff --git a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java index 1f93ea61..bf9de64d 100644 --- a/src/test/java/com/timgroup/statsd/BuilderAddressTest.java +++ b/src/test/java/com/timgroup/statsd/BuilderAddressTest.java @@ -213,7 +213,7 @@ public void address_resolution() throws Exception { if (expected instanceof UnixSocketAddressWithTransport) { UnixSocketAddressWithTransport a = (UnixSocketAddressWithTransport) actual; UnixSocketAddressWithTransport e = (UnixSocketAddressWithTransport) expected; - // native UDS support returns a SocketAddress rather than a UnixSocketAddress + // Native JDK UDS support returns a SocketAddress rather than a UnixSocketAddress assertEquals( ((FakeUnixSocketAddress) e.getAddress()).getPath(), a.getAddress() instanceof UnixSocketAddress