From cddde055f2db02d830200defc63c9c2d20149d3a Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 13 Mar 2025 17:21:52 -0400 Subject: [PATCH 01/13] Add Java version checker. --- .../timgroup/statsd/ClientChannelUtils.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 src/main/java/com/timgroup/statsd/ClientChannelUtils.java diff --git a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java b/src/main/java/com/timgroup/statsd/ClientChannelUtils.java new file mode 100644 index 00000000..697ed878 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/ClientChannelUtils.java @@ -0,0 +1,106 @@ +package com.timgroup.statsd; + +import java.util.ArrayList; +import java.util.List; + +// logic copied over from dd-trace-java Platform class. See: +// https://github.com/DataDog/dd-trace-java/blob/master/internal-api/src/main/java/datadog/trace/api/Platform.java +public class ClientChannelUtils { + private static final Version JAVA_VERSION = parseJavaVersion(System.getProperty("java.version")); + + private static Version parseJavaVersion(String javaVersion) { + // Remove pre-release part, usually -ea + final int indexOfDash = javaVersion.indexOf('-'); + if (indexOfDash >= 0) { + javaVersion = javaVersion.substring(0, indexOfDash); + } + + int major = 0; + int minor = 0; + int update = 0; + + try { + List nums = splitDigits(javaVersion); + major = nums.get(0); + + // for java 1.6/1.7/1.8 + if (major == 1) { + major = nums.get(1); + minor = nums.get(2); + update = nums.get(3); + } else { + minor = nums.get(1); + update = nums.get(2); + } + } catch (NumberFormatException | IndexOutOfBoundsException e) { + // unable to parse version string - do nothing + } + return new Version(major, minor, update); + } + + private static List splitDigits(String str) { + List results = new ArrayList<>(); + + int len = str.length(); + + int value = 0; + for (int i = 0; i < len; i++) { + char ch = str.charAt(i); + if (ch >= '0' && ch <= '9') { + value = value * 10 + (ch - '0'); + } else if (ch == '.' || ch == '_' || ch == '+') { + results.add(value); + value = 0; + } else { + throw new NumberFormatException(); + } + } + results.add(value); + return results; + } + + static final class Version { + public final int major, minor, update; + + public Version(int major, int minor, int update) { + this.major = major; + this.minor = minor; + this.update = update; + } + + public boolean is(int major) { + return this.major == major; + } + + public boolean is(int major, int minor) { + return this.major == major && this.minor == minor; + } + + public boolean is(int major, int minor, int update) { + return this.major == major && this.minor == minor && this.update == update; + } + + public boolean isAtLeast(int major, int minor, int update) { + return isAtLeast(this.major, this.minor, this.update, major, minor, update); + } + + private static boolean isAtLeast( + int major, int minor, int update, int atLeastMajor, int atLeastMinor, int atLeastUpdate) { + return (major > atLeastMajor) + || (major == atLeastMajor && minor > atLeastMinor) + || (major == atLeastMajor && minor == atLeastMinor && update >= atLeastUpdate); + } + } + + public static boolean isJavaVersionAtLeast(int major) { + return isJavaVersionAtLeast(major, 0, 0); + } + + public static boolean isJavaVersionAtLeast(int major, int minor) { + return isJavaVersionAtLeast(major, minor, 0); + } + + public static boolean isJavaVersionAtLeast(int major, int minor, int update) { + return JAVA_VERSION.isAtLeast(major, minor, update); + } +} From 0a414f7c600ab577f8093a6e4e9ff217cddfb55f Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 13 Mar 2025 17:22:49 -0400 Subject: [PATCH 02/13] Refactor UnixDatagramClientChannel. --- .../statsd/UnixDatagramClientChannel.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 7d996963..fc1f9204 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; class UnixDatagramClientChannel extends DatagramClientChannel { /** @@ -16,14 +17,23 @@ class UnixDatagramClientChannel extends DatagramClientChannel { * @throws IOException if socket options cannot be set */ UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { - super(UnixDatagramChannel.open(), address); - // Set send timeout, to handle the case where the transmission buffer is full - // If no timeout is set, the send becomes blocking - if (timeout > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); - } - if (bufferSize > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + super(ClientChannelUtils.isJavaVersionAtLeast(16) ? DatagramChannel.open() : UnixDatagramChannel.open(), address); + if (ClientChannelUtils.isJavaVersionAtLeast(16)) { + if (timeout > 0) { + delegate.socket().setSoTimeout(timeout); + } + if (bufferSize > 0) { + delegate.socket().setSendBufferSize(bufferSize); + } + } else { + // Set send timeout, to handle the case where the transmission buffer is full + // If no timeout is set, the send becomes blocking + if (timeout > 0) { + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); + } + if (bufferSize > 0) { + delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + } } } From b61c5d0549853eb46862a624e1cbc6bb1469dfb0 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Thu, 13 Mar 2025 17:23:10 -0400 Subject: [PATCH 03/13] Refactor UnixStreamClientChannel. --- .../statsd/UnixStreamClientChannel.java | 81 ++++++++++++------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index d910d786..d5fa90c1 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -14,7 +14,7 @@ * A ClientChannel for Unix domain sockets. */ public class UnixStreamClientChannel implements ClientChannel { - private final UnixSocketAddress address; + private final SocketAddress address; private final int timeout; private final int connectionTimeout; private final int bufferSize; @@ -30,7 +30,7 @@ public class UnixStreamClientChannel implements ClientChannel { */ UnixStreamClientChannel(SocketAddress address, int timeout, int connectionTimeout, int bufferSize) throws IOException { this.delegate = null; - this.address = (UnixSocketAddress) address; + this.address = address; this.timeout = timeout; this.connectionTimeout = connectionTimeout; this.bufferSize = bufferSize; @@ -125,39 +125,66 @@ private void connect() throws IOException { } } - UnixSocketChannel delegate = UnixSocketChannel.create(); - long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; - if (connectionTimeout > 0) { - // Set connect timeout, this should work at least on linux - // https://elixir.bootlin.com/linux/v5.7.4/source/net/unix/af_unix.c#L1696 - // We'd have better timeout support if we used Java 16's native Unix domain socket support (JEP 380) - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); - } - try { - if (!delegate.connect(address)) { - if (connectionTimeout > 0 && System.nanoTime() > deadline) { - throw new IOException("Connection timed out"); + // use Java 16's native Unix domain socket support for compatible versions + if (ClientChannelUtils.isJavaVersionAtLeast(16)) { + SocketChannel delegate = SocketChannel.open(); + if (connectionTimeout > 0) { + delegate.socket().setSoTimeout(connectionTimeout); + } + try { + delegate.configureBlocking(false); + if (!delegate.connect(address)) { + if (connectionTimeout > 0 && System.nanoTime() > deadline) { + throw new IOException("Connection timed out"); + } + if (!delegate.finishConnect()) { + throw new IOException("Connection failed"); + } + } + delegate.configureBlocking(true); + delegate.socket().setSoTimeout(Math.max(timeout, 0)); + if (bufferSize > 0) { + delegate.socket().setSendBufferSize(bufferSize); } - if (!delegate.finishConnect()) { - throw new IOException("Connection failed"); + } catch (Exception e) { + try { + delegate.close(); + } catch (IOException __) { + // ignore } + throw e; } - - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); - if (bufferSize > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + } else { + UnixSocketChannel delegate = UnixSocketChannel.create(); + if (connectionTimeout > 0) { + // Set connect timeout, this should work at least on linux + // https://elixir.bootlin.com/linux/v5.7.4/source/net/unix/af_unix.c#L1696 + // We'd have better timeout support if we used Java 16's native Unix domain socket support (JEP 380) + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); } - } catch (Exception e) { try { - delegate.close(); - } catch (IOException __) { - // ignore + if (!delegate.connect((UnixSocketAddress) address)) { + if (connectionTimeout > 0 && System.nanoTime() > deadline) { + throw new IOException("Connection timed out"); + } + if (!delegate.finishConnect()) { + throw new IOException("Connection failed"); + } + } + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); + if (bufferSize > 0) { + delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + } + } catch (Exception e) { + try { + delegate.close(); + } catch (IOException __) { + // ignore + } + throw e; } - throw e; } - - this.delegate = delegate; } From 495ef63a4844c110de2d2018d6b50bc885d3f1ad Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 14 Mar 2025 13:49:48 -0400 Subject: [PATCH 04/13] Add check for native UDS support. --- src/main/java/com/timgroup/statsd/ClientChannelUtils.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java b/src/main/java/com/timgroup/statsd/ClientChannelUtils.java index 697ed878..efa7373e 100644 --- a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java +++ b/src/main/java/com/timgroup/statsd/ClientChannelUtils.java @@ -7,6 +7,7 @@ // https://github.com/DataDog/dd-trace-java/blob/master/internal-api/src/main/java/datadog/trace/api/Platform.java public class ClientChannelUtils { private static final Version JAVA_VERSION = parseJavaVersion(System.getProperty("java.version")); + private static final int NATIVE_UDS_MIN_VERSION = 16; // Java 16+ has native Unix Domain Socket support private static Version parseJavaVersion(String javaVersion) { // Remove pre-release part, usually -ea @@ -103,4 +104,8 @@ public static boolean isJavaVersionAtLeast(int major, int minor) { public static boolean isJavaVersionAtLeast(int major, int minor, int update) { return JAVA_VERSION.isAtLeast(major, minor, update); } + + public static boolean hasNativeUDSSupport() { + return isJavaVersionAtLeast(NATIVE_UDS_MIN_VERSION); + } } From 6dd405c8874e4db7d3be376113a520b87a1fed5c Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 14 Mar 2025 13:50:37 -0400 Subject: [PATCH 05/13] Refactor UnixDatagramClientChannel again. --- .../statsd/UnixDatagramClientChannel.java | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index fc1f9204..3398338d 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -17,8 +17,20 @@ class UnixDatagramClientChannel extends DatagramClientChannel { * @throws IOException if socket options cannot be set */ UnixDatagramClientChannel(SocketAddress address, int timeout, int bufferSize) throws IOException { - super(ClientChannelUtils.isJavaVersionAtLeast(16) ? DatagramChannel.open() : UnixDatagramChannel.open(), address); - if (ClientChannelUtils.isJavaVersionAtLeast(16)) { + super(createChannel(address), address); + configureChannel(timeout, bufferSize); + } + + private static DatagramChannel createChannel(SocketAddress address) throws IOException { + if (ClientChannelUtils.hasNativeUDSSupport()) { + return DatagramChannel.open(); + } else { + return UnixDatagramChannel.open(); + } + } + + private void configureChannel(int timeout, int bufferSize) throws IOException { + if (ClientChannelUtils.hasNativeUDSSupport()) { if (timeout > 0) { delegate.socket().setSoTimeout(timeout); } @@ -26,13 +38,12 @@ class UnixDatagramClientChannel extends DatagramClientChannel { delegate.socket().setSendBufferSize(bufferSize); } } else { - // Set send timeout, to handle the case where the transmission buffer is full - // If no timeout is set, the send becomes blocking + UnixDatagramChannel unixChannel = (UnixDatagramChannel) delegate; if (timeout > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); + unixChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); } if (bufferSize > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + unixChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); } } } From 03b89c62127ee7f1227ac56a2edf68295c67b29d Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 14 Mar 2025 13:51:54 -0400 Subject: [PATCH 06/13] Refactor UnixStreamClientChannel again. --- .../statsd/UnixStreamClientChannel.java | 141 ++++++++++-------- 1 file changed, 81 insertions(+), 60 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index d5fa90c1..303db8bd 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -19,7 +19,6 @@ public class UnixStreamClientChannel implements ClientChannel { private final int connectionTimeout; private final int bufferSize; - private SocketChannel delegate; private final ByteBuffer delimiterBuffer = ByteBuffer.allocateDirect(Integer.SIZE / Byte.SIZE).order(ByteOrder.LITTLE_ENDIAN); @@ -74,35 +73,6 @@ public synchronized int write(ByteBuffer src) throws IOException { return size; } - /** - * Writes all bytes from the given buffer to the channel. - * @param bb buffer to write - * @param canReturnOnTimeout if true, we return if the channel is blocking and we haven't written anything yet - * @param deadline deadline for the write - * @return number of bytes written - * @throws IOException if the channel is closed or an error occurs - */ - public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) throws IOException { - int remaining = bb.remaining(); - int written = 0; - while (remaining > 0) { - int read = delegate.write(bb); - - // If we haven't written anything yet, we can still return - if (read == 0 && canReturnOnTimeout && written == 0) { - return written; - } - - remaining -= read; - written += read; - - if (deadline > 0 && System.nanoTime() > deadline) { - throw new IOException("Write timed out"); - } - } - return written; - } - private void connectIfNeeded() throws IOException { if (delegate == null) { connect(); @@ -127,14 +97,32 @@ private void connect() throws IOException { long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; // use Java 16's native Unix domain socket support for compatible versions - if (ClientChannelUtils.isJavaVersionAtLeast(16)) { + if (ClientChannelUtils.hasNativeUDSSupport()) { + connectJdkSocket(deadline); + } else { + connectJnrSocket(deadline); + } + } + + private void connectJdkSocket(long deadline) throws IOException { + String socketPath = address.toString(); + if (socketPath.startsWith("file://") || socketPath.startsWith("unix://")) { + socketPath = socketPath.substring(7); + } + + try { + // Use reflection to avoid compile-time dependency on Java 16+ classes + Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, socketPath); + SocketChannel delegate = SocketChannel.open(); if (connectionTimeout > 0) { delegate.socket().setSoTimeout(connectionTimeout); } + try { delegate.configureBlocking(false); - if (!delegate.connect(address)) { + if (!delegate.connect((SocketAddress) udsAddress)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); } @@ -147,45 +135,78 @@ private void connect() throws IOException { if (bufferSize > 0) { delegate.socket().setSendBufferSize(bufferSize); } + this.delegate = delegate; } catch (Exception e) { try { delegate.close(); } catch (IOException __) { // ignore } - throw e; - } - } else { - UnixSocketChannel delegate = UnixSocketChannel.create(); - if (connectionTimeout > 0) { - // Set connect timeout, this should work at least on linux - // https://elixir.bootlin.com/linux/v5.7.4/source/net/unix/af_unix.c#L1696 - // We'd have better timeout support if we used Java 16's native Unix domain socket support (JEP 380) - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); + throw new IOException("Failed to connect to Unix Domain Socket: " + socketPath, e); } - try { - if (!delegate.connect((UnixSocketAddress) address)) { - if (connectionTimeout > 0 && System.nanoTime() > deadline) { - throw new IOException("Connection timed out"); - } - if (!delegate.finishConnect()) { - throw new IOException("Connection failed"); - } - } - delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); - if (bufferSize > 0) { - delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + } catch (ReflectiveOperationException e) { + throw new IOException("Failed to create UnixDomainSocketAddress: Java 16+ required", e); + } + } + + private void connectJnrSocket(long deadline) throws IOException { + UnixSocketChannel delegate = UnixSocketChannel.create(); + if (connectionTimeout > 0) { + // Set connect timeout, this should work at least on linux + // https://elixir.bootlin.com/linux/v5.7.4/source/net/unix/af_unix.c#L1696 + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); + } + try { + if (!delegate.connect((UnixSocketAddress) address)) { + if (connectionTimeout > 0 && System.nanoTime() > deadline) { + throw new IOException("Connection timed out"); } - } catch (Exception e) { - try { - delegate.close(); - } catch (IOException __) { - // ignore + if (!delegate.finishConnect()) { + throw new IOException("Connection failed"); } - throw e; } + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); + if (bufferSize > 0) { + delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + } + this.delegate = delegate; + } catch (Exception e) { + try { + delegate.close(); + } catch (IOException __) { + // ignore + } + throw e; } - this.delegate = delegate; + } + + /** + * Writes all bytes from the given buffer to the channel. + * @param bb buffer to write + * @param canReturnOnTimeout if true, we return if the channel is blocking and we haven't written anything yet + * @param deadline deadline for the write + * @return number of bytes written + * @throws IOException if the channel is closed or an error occurs + */ + public int writeAll(ByteBuffer bb, boolean canReturnOnTimeout, long deadline) throws IOException { + int remaining = bb.remaining(); + int written = 0; + while (remaining > 0) { + int read = delegate.write(bb); + + // If we haven't written anything yet, we can still return + if (read == 0 && canReturnOnTimeout && written == 0) { + return written; + } + + remaining -= read; + written += read; + + if (deadline > 0 && System.nanoTime() > deadline) { + throw new IOException("Write timed out"); + } + } + return written; } @Override From 3c8504b78bb397dca1dc38cff8f37c4e955e0820 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Fri, 14 Mar 2025 14:21:22 -0400 Subject: [PATCH 07/13] Try addressing test errors. --- .../com/timgroup/statsd/ClientChannelUtils.java | 8 +++++--- .../statsd/UnixDatagramClientChannel.java | 9 ++++----- .../timgroup/statsd/UnixStreamClientChannel.java | 16 +++++++++++----- .../java/com/timgroup/statsd/TestHelpers.java | 11 ++++++++++- .../UnixDatagramSocketDummyStatsDServer.java | 11 ++++++++++- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java b/src/main/java/com/timgroup/statsd/ClientChannelUtils.java index efa7373e..4ecfe2fc 100644 --- a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java +++ b/src/main/java/com/timgroup/statsd/ClientChannelUtils.java @@ -3,7 +3,7 @@ import java.util.ArrayList; import java.util.List; -// logic copied over from dd-trace-java Platform class. See: +// Logic copied from dd-trace-java Platform class. See: // https://github.com/DataDog/dd-trace-java/blob/master/internal-api/src/main/java/datadog/trace/api/Platform.java public class ClientChannelUtils { private static final Version JAVA_VERSION = parseJavaVersion(System.getProperty("java.version")); @@ -61,7 +61,9 @@ private static List splitDigits(String str) { } static final class Version { - public final int major, minor, update; + public final int major; + public final int minor; + public final int update; public Version(int major, int minor, int update) { this.major = major; @@ -105,7 +107,7 @@ public static boolean isJavaVersionAtLeast(int major, int minor, int update) { return JAVA_VERSION.isAtLeast(major, minor, update); } - public static boolean hasNativeUDSSupport() { + public static boolean hasNativeUdsSupport() { return isJavaVersionAtLeast(NATIVE_UDS_MIN_VERSION); } } diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index 3398338d..b1f75c21 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -22,7 +22,7 @@ class UnixDatagramClientChannel extends DatagramClientChannel { } private static DatagramChannel createChannel(SocketAddress address) throws IOException { - if (ClientChannelUtils.hasNativeUDSSupport()) { + if (ClientChannelUtils.hasNativeUdsSupport()) { return DatagramChannel.open(); } else { return UnixDatagramChannel.open(); @@ -30,7 +30,7 @@ private static DatagramChannel createChannel(SocketAddress address) throws IOExc } private void configureChannel(int timeout, int bufferSize) throws IOException { - if (ClientChannelUtils.hasNativeUDSSupport()) { + if (ClientChannelUtils.hasNativeUdsSupport()) { if (timeout > 0) { delegate.socket().setSoTimeout(timeout); } @@ -38,12 +38,11 @@ private void configureChannel(int timeout, int bufferSize) throws IOException { delegate.socket().setSendBufferSize(bufferSize); } } else { - UnixDatagramChannel unixChannel = (UnixDatagramChannel) delegate; if (timeout > 0) { - unixChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); + delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout); } if (bufferSize > 0) { - unixChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); + delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); } } } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 303db8bd..cded1971 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -96,8 +96,8 @@ private void connect() throws IOException { } long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; - // use Java 16's native Unix domain socket support for compatible versions - if (ClientChannelUtils.hasNativeUDSSupport()) { + // Use native JDK Unix domain socket support for compatible versions (Java 16+). Fall back to JNR support otherwise. + if (ClientChannelUtils.hasNativeUdsSupport()) { connectJdkSocket(deadline); } else { connectJnrSocket(deadline); @@ -105,9 +105,15 @@ private void connect() throws IOException { } private void connectJdkSocket(long deadline) throws IOException { - String socketPath = address.toString(); - if (socketPath.startsWith("file://") || socketPath.startsWith("unix://")) { - socketPath = socketPath.substring(7); + String socketPath; + if (address instanceof UnixSocketAddress) { + UnixSocketAddress unixAddr = (UnixSocketAddress) address; + socketPath = unixAddr.path(); + } else { + socketPath = address.toString(); + if (socketPath.startsWith("file://") || socketPath.startsWith("unix://")) { + socketPath = socketPath.substring(7); + } } try { diff --git a/src/test/java/com/timgroup/statsd/TestHelpers.java b/src/test/java/com/timgroup/statsd/TestHelpers.java index 1eebd9e8..7018f440 100644 --- a/src/test/java/com/timgroup/statsd/TestHelpers.java +++ b/src/test/java/com/timgroup/statsd/TestHelpers.java @@ -20,7 +20,16 @@ static boolean isJnrAvailable() { } } + static boolean isJdkUdsAvailable() { + try { + Class.forName("java.nio.channels.DatagramChannel"); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + static boolean isUdsAvailable() { - return (isLinux() || isMac()) && isJnrAvailable(); + return (isLinux() || isMac()) && (isJdkUdsAvailable() || isJnrAvailable()); } } diff --git a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java index 753411e0..6d7d9874 100644 --- a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java @@ -10,6 +10,7 @@ public class UnixDatagramSocketDummyStatsDServer extends DummyStatsDServer { private final DatagramChannel server; + private volatile boolean running = true; public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException { server = UnixDatagramChannel.open(); @@ -19,14 +20,22 @@ public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException @Override protected boolean isOpen() { - return server.isOpen(); + return running && server.isOpen(); } protected void receive(ByteBuffer packet) throws IOException { server.receive(packet); } + @Override public void close() throws IOException { + running = false; // Signal the listening thread to stop + try { + // Give the listening thread a chance to stop + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore + } try { server.close(); } catch (Exception e) { From ac1935a98fa7c34cbe8b11c1b3cd9613a1e57ccb Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Mar 2025 17:43:03 -0400 Subject: [PATCH 08/13] Fix string parsing and TestHelper --- .../timgroup/statsd/UnixStreamClientChannel.java | 15 ++++----------- .../java/com/timgroup/statsd/TestHelpers.java | 11 +---------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index cded1971..0f7da450 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SocketChannel; +import java.lang.reflect.Method; /** * A ClientChannel for Unix domain sockets. @@ -105,16 +106,7 @@ private void connect() throws IOException { } private void connectJdkSocket(long deadline) throws IOException { - String socketPath; - if (address instanceof UnixSocketAddress) { - UnixSocketAddress unixAddr = (UnixSocketAddress) address; - socketPath = unixAddr.path(); - } else { - socketPath = address.toString(); - if (socketPath.startsWith("file://") || socketPath.startsWith("unix://")) { - socketPath = socketPath.substring(7); - } - } + String socketPath = address.toString(); try { // Use reflection to avoid compile-time dependency on Java 16+ classes @@ -128,7 +120,8 @@ private void connectJdkSocket(long deadline) throws IOException { try { delegate.configureBlocking(false); - if (!delegate.connect((SocketAddress) udsAddress)) { + // Use reflection to call the UDS specific connect method + if (!delegate.connect(udsAddress)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); } diff --git a/src/test/java/com/timgroup/statsd/TestHelpers.java b/src/test/java/com/timgroup/statsd/TestHelpers.java index 7018f440..1eebd9e8 100644 --- a/src/test/java/com/timgroup/statsd/TestHelpers.java +++ b/src/test/java/com/timgroup/statsd/TestHelpers.java @@ -20,16 +20,7 @@ static boolean isJnrAvailable() { } } - static boolean isJdkUdsAvailable() { - try { - Class.forName("java.nio.channels.DatagramChannel"); - return true; - } catch (ClassNotFoundException e) { - return false; - } - } - static boolean isUdsAvailable() { - return (isLinux() || isMac()) && (isJdkUdsAvailable() || isJnrAvailable()); + return (isLinux() || isMac()) && isJnrAvailable(); } } From 628c9802e580807e764b6f4cf4efccf794c55206 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Mar 2025 22:49:30 -0400 Subject: [PATCH 09/13] Adjust DummyStatsDServers --- .../NonBlockingStatsDClientBuilder.java | 9 +++ .../statsd/UnixStreamClientChannel.java | 3 +- .../UnixDatagramSocketDummyStatsDServer.java | 19 ++++- .../UnixStreamSocketDummyStatsDServer.java | 80 ++++++++++++------- 4 files changed, 79 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 2ceedccd..9d431fc6 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -376,6 +376,15 @@ protected static Callable staticUnixResolution( final UnixSocketAddressWithTransport.TransportType transportType) { return new Callable() { @Override public SocketAddress call() { + if (ClientChannelUtils.hasNativeUdsSupport()) { + try { + Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, path); + return new UnixSocketAddressWithTransport((SocketAddress) udsAddress, transportType); + } catch (ReflectiveOperationException e) { + // Fall back to JNR implementation + } + } final UnixSocketAddress socketAddress = new UnixSocketAddress(path); return new UnixSocketAddressWithTransport(socketAddress, transportType); } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 0f7da450..3511c71d 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -120,8 +120,7 @@ private void connectJdkSocket(long deadline) throws IOException { try { delegate.configureBlocking(false); - // Use reflection to call the UDS specific connect method - if (!delegate.connect(udsAddress)) { + if (!delegate.connect((SocketAddress) udsAddress)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); } diff --git a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java index 6d7d9874..791c5ee5 100644 --- a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.net.SocketAddress; import jnr.unixsocket.UnixDatagramChannel; import jnr.unixsocket.UnixSocketAddress; @@ -13,8 +14,22 @@ public class UnixDatagramSocketDummyStatsDServer extends DummyStatsDServer { private volatile boolean running = true; public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException { - server = UnixDatagramChannel.open(); - server.bind(new UnixSocketAddress(socketPath)); + if (ClientChannelUtils.hasNativeUdsSupport()) { + try { + Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, socketPath); + + DatagramChannel nativeServer = DatagramChannel.open(); + nativeServer.bind((SocketAddress) udsAddress); + this.server = nativeServer; + } catch (ReflectiveOperationException e) { + throw new IOException(e); + } + } else { + UnixDatagramChannel jnrServer = UnixDatagramChannel.open(); + jnrServer.bind(new UnixSocketAddress(socketPath)); + this.server = jnrServer; + } this.listen(); } diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java index ea743c8e..b2e5d425 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java @@ -4,7 +4,9 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.net.SocketAddress; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; import jnr.unixsocket.UnixServerSocketChannel; @@ -14,21 +16,37 @@ import static com.timgroup.statsd.NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; public class UnixStreamSocketDummyStatsDServer extends DummyStatsDServer { - private final UnixServerSocketChannel server; - private final ConcurrentLinkedQueue channels = new ConcurrentLinkedQueue<>(); + private final Object server; // Object is either ServerSocketChannel or UnixServerSocketChannel + private final ConcurrentLinkedQueue channels = new ConcurrentLinkedQueue<>(); + private final boolean useNativeUds; private final Logger logger = Logger.getLogger(UnixStreamSocketDummyStatsDServer.class.getName()); public UnixStreamSocketDummyStatsDServer(String socketPath) throws IOException { - server = UnixServerSocketChannel.open(); - server.configureBlocking(true); - server.socket().bind(new UnixSocketAddress(socketPath)); + this.useNativeUds = ClientChannelUtils.hasNativeUdsSupport(); + if (useNativeUds) { + try { + Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, socketPath); + + ServerSocketChannel nativeServer = ServerSocketChannel.open(); + nativeServer.bind((SocketAddress) udsAddress); + this.server = nativeServer; + } catch (ReflectiveOperationException e) { + throw new IOException(e); + } + } else { + UnixServerSocketChannel jnrServer = UnixServerSocketChannel.open(); + jnrServer.configureBlocking(true); + jnrServer.socket().bind(new UnixSocketAddress(socketPath)); + this.server = jnrServer; + } this.listen(); } @Override protected boolean isOpen() { - return server.isOpen(); + return useNativeUds ? ((ServerSocketChannel)server).isOpen() : ((UnixServerSocketChannel)server).isOpen(); } @Override @@ -38,7 +56,12 @@ protected void receive(ByteBuffer packet) throws IOException { @Override protected void listen() { - logger.info("Listening on " + server.getLocalSocketAddress()); + try { + String localAddressMessage = useNativeUds ? "Listening on " + ((ServerSocketChannel)server).getLocalAddress() : "Listening on " + ((UnixServerSocketChannel)server).getLocalSocketAddress(); + logger.info(localAddressMessage); + } catch (Exception e) { + logger.warning("Failed to get local address: " + e); + } Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -46,21 +69,20 @@ public void run() { if (sleepIfFrozen()) { continue; } - try { - logger.info("Waiting for connection"); - UnixSocketChannel clientChannel = server.accept(); - if (clientChannel != null) { - clientChannel.configureBlocking(true); - try { - logger.info("Accepted connection from " + clientChannel.getRemoteSocketAddress()); - } catch (Exception e) { - logger.warning("Failed to get remote socket address"); - } - channels.add(clientChannel); - readChannel(clientChannel); - } - } catch (IOException e) { + try { + logger.info("Waiting for connection"); + SocketChannel clientChannel = null; + clientChannel = useNativeUds ? ((ServerSocketChannel)server).accept() : ((UnixServerSocketChannel)server).accept(); + if (clientChannel != null) { + clientChannel.configureBlocking(true); + String connectionMessage = useNativeUds ? "Accepted connection from " + clientChannel.getRemoteAddress() : "Accepted connection from " + ((UnixSocketChannel)clientChannel).getRemoteSocketAddress(); + logger.info(connectionMessage); + channels.add(clientChannel); + readChannel(clientChannel); } + } catch (Exception e) { + // ignore + } } } }); @@ -68,9 +90,9 @@ public void run() { thread.start(); } - public void readChannel(final UnixSocketChannel clientChannel) { - logger.info("Reading from " + clientChannel); - Thread thread = new Thread(new Runnable() { + public void readChannel(final SocketChannel clientChannel) { + logger.info("Reading from " + clientChannel); + Thread thread = new Thread(new Runnable() { @Override public void run() { final ByteBuffer packet = ByteBuffer.allocate(DEFAULT_UDS_MAX_PACKET_SIZE_BYTES); @@ -90,7 +112,6 @@ public void run() { logger.warning("Failed to close channel: " + e); } } - } logger.info("Disconnected from " + clientChannel); } @@ -128,13 +149,16 @@ private boolean readPacket(SocketChannel channel, ByteBuffer packet) { public void close() throws IOException { try { - server.close(); - for (UnixSocketChannel channel : channels) { + if (useNativeUds) { + ((ServerSocketChannel)server).close(); + } else { + ((UnixServerSocketChannel)server).close(); + } + for (SocketChannel channel : channels) { channel.close(); } } catch (Exception e) { //ignore } } - } From 4594c675a0d8602c5e1ffec10d60fb30eb68f2bf Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Mar 2025 23:04:29 -0400 Subject: [PATCH 10/13] Undo NonBlockingStatsDClientBuilder changes --- .../timgroup/statsd/NonBlockingStatsDClientBuilder.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 9d431fc6..2ceedccd 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -376,15 +376,6 @@ protected static Callable staticUnixResolution( final UnixSocketAddressWithTransport.TransportType transportType) { return new Callable() { @Override public SocketAddress call() { - if (ClientChannelUtils.hasNativeUdsSupport()) { - try { - Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); - Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, path); - return new UnixSocketAddressWithTransport((SocketAddress) udsAddress, transportType); - } catch (ReflectiveOperationException e) { - // Fall back to JNR implementation - } - } final UnixSocketAddress socketAddress = new UnixSocketAddress(path); return new UnixSocketAddressWithTransport(socketAddress, transportType); } From 0e271e4764b0ac5af87ce984a02c644fa14021ef Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Tue, 18 Mar 2025 23:33:59 -0400 Subject: [PATCH 11/13] Debug --- .../com/timgroup/statsd/UnixStreamClientChannel.java | 2 +- .../statsd/UnixDatagramSocketDummyStatsDServer.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 3511c71d..4ad5203a 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -5,11 +5,11 @@ import jnr.unixsocket.UnixSocketOptions; import java.io.IOException; +import java.lang.reflect.Method; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SocketChannel; -import java.lang.reflect.Method; /** * A ClientChannel for Unix domain sockets. diff --git a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java index 791c5ee5..b377c877 100644 --- a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java @@ -11,7 +11,7 @@ public class UnixDatagramSocketDummyStatsDServer extends DummyStatsDServer { private final DatagramChannel server; - private volatile boolean running = true; + private volatile boolean serverRunning = true; public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException { if (ClientChannelUtils.hasNativeUdsSupport()) { @@ -35,7 +35,7 @@ public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException @Override protected boolean isOpen() { - return running && server.isOpen(); + return serverRunning && server.isOpen(); } protected void receive(ByteBuffer packet) throws IOException { @@ -44,12 +44,12 @@ protected void receive(ByteBuffer packet) throws IOException { @Override public void close() throws IOException { - running = false; // Signal the listening thread to stop + serverRunning = false; try { // Give the listening thread a chance to stop - Thread.sleep(50); + Thread.sleep(1000); } catch (InterruptedException e) { - // Ignore + // ignore } try { server.close(); From b0062ccbf72142f57413eaf456e4d0f4f69b9331 Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 19 Mar 2025 10:46:11 -0400 Subject: [PATCH 12/13] Revert changes made to DummyStatsDServer files --- .../UnixDatagramSocketDummyStatsDServer.java | 30 +------ .../UnixStreamSocketDummyStatsDServer.java | 80 +++++++------------ 2 files changed, 31 insertions(+), 79 deletions(-) diff --git a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java index b377c877..753411e0 100644 --- a/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixDatagramSocketDummyStatsDServer.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import java.net.SocketAddress; import jnr.unixsocket.UnixDatagramChannel; import jnr.unixsocket.UnixSocketAddress; @@ -11,46 +10,23 @@ public class UnixDatagramSocketDummyStatsDServer extends DummyStatsDServer { private final DatagramChannel server; - private volatile boolean serverRunning = true; public UnixDatagramSocketDummyStatsDServer(String socketPath) throws IOException { - if (ClientChannelUtils.hasNativeUdsSupport()) { - try { - Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); - Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, socketPath); - - DatagramChannel nativeServer = DatagramChannel.open(); - nativeServer.bind((SocketAddress) udsAddress); - this.server = nativeServer; - } catch (ReflectiveOperationException e) { - throw new IOException(e); - } - } else { - UnixDatagramChannel jnrServer = UnixDatagramChannel.open(); - jnrServer.bind(new UnixSocketAddress(socketPath)); - this.server = jnrServer; - } + server = UnixDatagramChannel.open(); + server.bind(new UnixSocketAddress(socketPath)); this.listen(); } @Override protected boolean isOpen() { - return serverRunning && server.isOpen(); + return server.isOpen(); } protected void receive(ByteBuffer packet) throws IOException { server.receive(packet); } - @Override public void close() throws IOException { - serverRunning = false; - try { - // Give the listening thread a chance to stop - Thread.sleep(1000); - } catch (InterruptedException e) { - // ignore - } try { server.close(); } catch (Exception e) { diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java index b2e5d425..ea743c8e 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java @@ -4,9 +4,7 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.net.SocketAddress; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; import jnr.unixsocket.UnixServerSocketChannel; @@ -16,37 +14,21 @@ import static com.timgroup.statsd.NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES; public class UnixStreamSocketDummyStatsDServer extends DummyStatsDServer { - private final Object server; // Object is either ServerSocketChannel or UnixServerSocketChannel - private final ConcurrentLinkedQueue channels = new ConcurrentLinkedQueue<>(); - private final boolean useNativeUds; + private final UnixServerSocketChannel server; + private final ConcurrentLinkedQueue channels = new ConcurrentLinkedQueue<>(); private final Logger logger = Logger.getLogger(UnixStreamSocketDummyStatsDServer.class.getName()); public UnixStreamSocketDummyStatsDServer(String socketPath) throws IOException { - this.useNativeUds = ClientChannelUtils.hasNativeUdsSupport(); - if (useNativeUds) { - try { - Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); - Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, socketPath); - - ServerSocketChannel nativeServer = ServerSocketChannel.open(); - nativeServer.bind((SocketAddress) udsAddress); - this.server = nativeServer; - } catch (ReflectiveOperationException e) { - throw new IOException(e); - } - } else { - UnixServerSocketChannel jnrServer = UnixServerSocketChannel.open(); - jnrServer.configureBlocking(true); - jnrServer.socket().bind(new UnixSocketAddress(socketPath)); - this.server = jnrServer; - } + server = UnixServerSocketChannel.open(); + server.configureBlocking(true); + server.socket().bind(new UnixSocketAddress(socketPath)); this.listen(); } @Override protected boolean isOpen() { - return useNativeUds ? ((ServerSocketChannel)server).isOpen() : ((UnixServerSocketChannel)server).isOpen(); + return server.isOpen(); } @Override @@ -56,12 +38,7 @@ protected void receive(ByteBuffer packet) throws IOException { @Override protected void listen() { - try { - String localAddressMessage = useNativeUds ? "Listening on " + ((ServerSocketChannel)server).getLocalAddress() : "Listening on " + ((UnixServerSocketChannel)server).getLocalSocketAddress(); - logger.info(localAddressMessage); - } catch (Exception e) { - logger.warning("Failed to get local address: " + e); - } + logger.info("Listening on " + server.getLocalSocketAddress()); Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -69,20 +46,21 @@ public void run() { if (sleepIfFrozen()) { continue; } - try { - logger.info("Waiting for connection"); - SocketChannel clientChannel = null; - clientChannel = useNativeUds ? ((ServerSocketChannel)server).accept() : ((UnixServerSocketChannel)server).accept(); - if (clientChannel != null) { - clientChannel.configureBlocking(true); - String connectionMessage = useNativeUds ? "Accepted connection from " + clientChannel.getRemoteAddress() : "Accepted connection from " + ((UnixSocketChannel)clientChannel).getRemoteSocketAddress(); - logger.info(connectionMessage); - channels.add(clientChannel); - readChannel(clientChannel); + try { + logger.info("Waiting for connection"); + UnixSocketChannel clientChannel = server.accept(); + if (clientChannel != null) { + clientChannel.configureBlocking(true); + try { + logger.info("Accepted connection from " + clientChannel.getRemoteSocketAddress()); + } catch (Exception e) { + logger.warning("Failed to get remote socket address"); + } + channels.add(clientChannel); + readChannel(clientChannel); + } + } catch (IOException e) { } - } catch (Exception e) { - // ignore - } } } }); @@ -90,9 +68,9 @@ public void run() { thread.start(); } - public void readChannel(final SocketChannel clientChannel) { - logger.info("Reading from " + clientChannel); - Thread thread = new Thread(new Runnable() { + public void readChannel(final UnixSocketChannel clientChannel) { + logger.info("Reading from " + clientChannel); + Thread thread = new Thread(new Runnable() { @Override public void run() { final ByteBuffer packet = ByteBuffer.allocate(DEFAULT_UDS_MAX_PACKET_SIZE_BYTES); @@ -112,6 +90,7 @@ public void run() { logger.warning("Failed to close channel: " + e); } } + } logger.info("Disconnected from " + clientChannel); } @@ -149,16 +128,13 @@ private boolean readPacket(SocketChannel channel, ByteBuffer packet) { public void close() throws IOException { try { - if (useNativeUds) { - ((ServerSocketChannel)server).close(); - } else { - ((UnixServerSocketChannel)server).close(); - } - for (SocketChannel channel : channels) { + server.close(); + for (UnixSocketChannel channel : channels) { channel.close(); } } catch (Exception e) { //ignore } } + } From 4bb848d7e66f1c8a801b44e95204b55b06195fba Mon Sep 17 00:00:00 2001 From: Sarah Chen Date: Wed, 19 Mar 2025 17:06:24 -0400 Subject: [PATCH 13/13] Adjust ClientBuilder and add debugging print statements... --- .../NonBlockingStatsDClientBuilder.java | 52 ++++++++++++++--- .../statsd/UnixDatagramClientChannel.java | 4 +- .../statsd/UnixStreamClientChannel.java | 58 ++++++++++--------- ...entChannelUtils.java => VersionUtils.java} | 2 +- .../UnixStreamSocketDummyStatsDServer.java | 5 ++ 5 files changed, 82 insertions(+), 39 deletions(-) rename src/main/java/com/timgroup/statsd/{ClientChannelUtils.java => VersionUtils.java} (99%) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 2ceedccd..c9e0b694 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -329,10 +329,27 @@ public static Callable volatileAddressResolution(final String hos if (port == 0) { return new Callable() { @Override public SocketAddress call() throws UnknownHostException { - return new UnixSocketAddressWithTransport( - new UnixSocketAddress(hostname), - UnixSocketAddressWithTransport.TransportType.UDS - ); + if (VersionUtils.hasNativeUdsSupport()) { + try { + Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + SocketAddress udsAddress = (SocketAddress) + udsAddressClass.getMethod("of", String.class).invoke(null, hostname); + System.out.println("================UnixSocketAddressWithTransport returned with: " + udsAddress); + return new UnixSocketAddressWithTransport( + udsAddress, + UnixSocketAddressWithTransport.TransportType.UDS + ); + } catch (Exception e) { + throw new UnknownHostException("Failed to create UnixDomainSocketAddress: " + e.getMessage()); + } + } else { + SocketAddress jnrAddress = new UnixSocketAddress(hostname); + System.out.println("================UnixSocketAddressWithTransport returned with: " + jnrAddress); + return new UnixSocketAddressWithTransport( + jnrAddress, + UnixSocketAddressWithTransport.TransportType.UDS + ); + } } }; } else { @@ -374,12 +391,29 @@ protected static Callable staticNamedPipeResolution(String namedP protected static Callable staticUnixResolution( final String path, final UnixSocketAddressWithTransport.TransportType transportType) { - return new Callable() { - @Override public SocketAddress call() { - final UnixSocketAddress socketAddress = new UnixSocketAddress(path); - return new UnixSocketAddressWithTransport(socketAddress, transportType); + if (VersionUtils.hasNativeUdsSupport()) { + try { + Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); + final SocketAddress udsAddress = (SocketAddress) udsAddressClass.getMethod("of", String.class).invoke(null, path); + System.out.println("================new Callable returned with udsAddress: " + udsAddress); + return new Callable() { + @Override public SocketAddress call() { + System.out.println("================UnixSocketAddressWithTransport returned with: " + udsAddress); + return new UnixSocketAddressWithTransport(udsAddress, transportType); + } + }; + } catch (Exception e) { + throw new RuntimeException("Failed to create UnixDomainSocketAddress: " + e.getMessage(), e); } - }; + } else { + return new Callable() { + @Override public SocketAddress call() { + final UnixSocketAddress jnrAddress = new UnixSocketAddress(path); + System.out.println("================UnixSocketAddressWithTransport returned with: " + jnrAddress); + return new UnixSocketAddressWithTransport(jnrAddress, transportType); + } + }; + } } private static Callable staticAddress(final String hostname, final int port) { diff --git a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java index b1f75c21..b3dc5a5b 100644 --- a/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java @@ -22,7 +22,7 @@ class UnixDatagramClientChannel extends DatagramClientChannel { } private static DatagramChannel createChannel(SocketAddress address) throws IOException { - if (ClientChannelUtils.hasNativeUdsSupport()) { + if (VersionUtils.hasNativeUdsSupport()) { return DatagramChannel.open(); } else { return UnixDatagramChannel.open(); @@ -30,7 +30,7 @@ private static DatagramChannel createChannel(SocketAddress address) throws IOExc } private void configureChannel(int timeout, int bufferSize) throws IOException { - if (ClientChannelUtils.hasNativeUdsSupport()) { + if (VersionUtils.hasNativeUdsSupport()) { if (timeout > 0) { delegate.socket().setSoTimeout(timeout); } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index 4ad5203a..b3a163f4 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -34,6 +34,7 @@ public class UnixStreamClientChannel implements ClientChannel { this.timeout = timeout; this.connectionTimeout = connectionTimeout; this.bufferSize = bufferSize; + System.out.println("================Created UnixStreamClientChannel with address: " + address); } @Override @@ -98,7 +99,7 @@ private void connect() throws IOException { long deadline = System.nanoTime() + connectionTimeout * 1_000_000L; // Use native JDK Unix domain socket support for compatible versions (Java 16+). Fall back to JNR support otherwise. - if (ClientChannelUtils.hasNativeUdsSupport()) { + if (VersionUtils.hasNativeUdsSupport()) { connectJdkSocket(deadline); } else { connectJnrSocket(deadline); @@ -106,10 +107,8 @@ private void connect() throws IOException { } private void connectJdkSocket(long deadline) throws IOException { - String socketPath = address.toString(); - + String socketPath = address.toString(); try { - // Use reflection to avoid compile-time dependency on Java 16+ classes Class udsAddressClass = Class.forName("java.net.UnixDomainSocketAddress"); Object udsAddress = udsAddressClass.getMethod("of", String.class).invoke(null, socketPath); @@ -118,32 +117,34 @@ private void connectJdkSocket(long deadline) throws IOException { delegate.socket().setSoTimeout(connectionTimeout); } - try { - delegate.configureBlocking(false); - if (!delegate.connect((SocketAddress) udsAddress)) { - if (connectionTimeout > 0 && System.nanoTime() > deadline) { - throw new IOException("Connection timed out"); - } - if (!delegate.finishConnect()) { - throw new IOException("Connection failed"); - } - } - delegate.configureBlocking(true); - delegate.socket().setSoTimeout(Math.max(timeout, 0)); - if (bufferSize > 0) { - delegate.socket().setSendBufferSize(bufferSize); + delegate.configureBlocking(false); + System.out.println("================Attempting to connect delegate to: " + udsAddress); + if (!delegate.connect((SocketAddress) udsAddress)) { + System.out.println("================Initial connect returned false, checking deadline"); + if (connectionTimeout > 0 && System.nanoTime() > deadline) { + throw new IOException("Connection timed out"); } - this.delegate = delegate; - } catch (Exception e) { - try { - delegate.close(); - } catch (IOException __) { - // ignore + System.out.println("================Finishing connection"); + if (!delegate.finishConnect()) { + throw new IOException("Connection failed"); } - throw new IOException("Failed to connect to Unix Domain Socket: " + socketPath, e); } - } catch (ReflectiveOperationException e) { - throw new IOException("Failed to create UnixDomainSocketAddress: Java 16+ required", e); + System.out.println("================Connection successful"); + delegate.configureBlocking(true); + delegate.socket().setSoTimeout(Math.max(timeout, 0)); + if (bufferSize > 0) { + delegate.socket().setSendBufferSize(bufferSize); + } + this.delegate = delegate; + System.out.println("================Set up complete."); + } catch (Exception e) { + System.out.println("================Failed to connect to UDS at: " + socketPath); + try { + delegate.close(); + } catch (IOException __) { + // ignore + } + throw new IOException("Failed to connect to Unix Domain Socket: " + socketPath, e); } } @@ -155,6 +156,7 @@ private void connectJnrSocket(long deadline) throws IOException { delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, connectionTimeout); } try { + System.out.println("================Attempting to connect delegate to: " + address); if (!delegate.connect((UnixSocketAddress) address)) { if (connectionTimeout > 0 && System.nanoTime() > deadline) { throw new IOException("Connection timed out"); @@ -163,11 +165,13 @@ private void connectJnrSocket(long deadline) throws IOException { throw new IOException("Connection failed"); } } + System.out.println("================Connection successful"); delegate.setOption(UnixSocketOptions.SO_SNDTIMEO, Math.max(timeout, 0)); if (bufferSize > 0) { delegate.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize); } this.delegate = delegate; + System.out.println("================Set up complete."); } catch (Exception e) { try { delegate.close(); diff --git a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java b/src/main/java/com/timgroup/statsd/VersionUtils.java similarity index 99% rename from src/main/java/com/timgroup/statsd/ClientChannelUtils.java rename to src/main/java/com/timgroup/statsd/VersionUtils.java index 4ecfe2fc..f7c082c2 100644 --- a/src/main/java/com/timgroup/statsd/ClientChannelUtils.java +++ b/src/main/java/com/timgroup/statsd/VersionUtils.java @@ -5,7 +5,7 @@ // Logic copied from dd-trace-java Platform class. See: // https://github.com/DataDog/dd-trace-java/blob/master/internal-api/src/main/java/datadog/trace/api/Platform.java -public class ClientChannelUtils { +public class VersionUtils { private static final Version JAVA_VERSION = parseJavaVersion(System.getProperty("java.version")); private static final int NATIVE_UDS_MIN_VERSION = 16; // Java 16+ has native Unix Domain Socket support diff --git a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java index ea743c8e..6ba45555 100644 --- a/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UnixStreamSocketDummyStatsDServer.java @@ -23,6 +23,7 @@ public UnixStreamSocketDummyStatsDServer(String socketPath) throws IOException { server = UnixServerSocketChannel.open(); server.configureBlocking(true); server.socket().bind(new UnixSocketAddress(socketPath)); + System.out.println("================Server bound to " + socketPath); this.listen(); } @@ -39,6 +40,7 @@ protected void receive(ByteBuffer packet) throws IOException { @Override protected void listen() { logger.info("Listening on " + server.getLocalSocketAddress()); + System.out.println("================Server listening on " + server.getLocalSocketAddress()); Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -48,7 +50,9 @@ public void run() { } try { logger.info("Waiting for connection"); + System.out.println("================Server waiting for connection"); UnixSocketChannel clientChannel = server.accept(); + System.out.println("================Server accepted connection"); if (clientChannel != null) { clientChannel.configureBlocking(true); try { @@ -60,6 +64,7 @@ public void run() { readChannel(clientChannel); } } catch (IOException e) { + System.out.println("================Server caught IOException: " + e); } } }