diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy index 493e1af0967..474f1fe7ed5 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTestBase.groovy @@ -138,9 +138,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { TEST_WRITER.setFilter(dropEmptyKafkaPoll) KafkaProducer producer = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()) String clusterId = null - while (clusterId == null || clusterId.isEmpty()) { + def maxAttempts = 20 // 30 seconds total + def attemptCount = 0 + + while ((clusterId == null || clusterId.isEmpty()) && attemptCount < maxAttempts) { Thread.sleep(1500) - clusterId = producer.metadata.fetch().clusterResource().clusterId() + try { + clusterId = producer.metadata.fetch().clusterResource().clusterId() + } catch (Exception e) { + // Log and continue - metadata might not be ready yet + } + attemptCount++ + } + + if (clusterId == null || clusterId.isEmpty()) { + throw new RuntimeException("Failed to obtain cluster ID after ${maxAttempts * 1.5} seconds") } // set up the Kafka consumer properties @@ -194,7 +206,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { then: // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) + def received = null + def maxPollingAttempts = 3 + def pollingTimeout = 10 // seconds + + for (int i = 0; i < maxPollingAttempts && received == null; i++) { + received = records.poll(pollingTimeout, TimeUnit.SECONDS) + if (received == null && i < maxPollingAttempts - 1) { + Thread.sleep(1000) // Brief pause between retries + } + } + + if (received == null) { + throw new AssertionError("No message received after ${maxPollingAttempts * pollingTimeout} seconds") + } + received.value() == greeting received.key() == null diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 569538197d4..1cbab1ef763 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -187,9 +187,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { if (isDataStreamsEnabled()) { producer.flush() clusterId = producer.metadata.cluster.clusterResource().clusterId() - while (clusterId == null || clusterId.isEmpty()) { + def maxAttempts = 20 // 30 seconds total + def attemptCount = 0 + + while ((clusterId == null || clusterId.isEmpty()) && attemptCount < maxAttempts) { Thread.sleep(1500) - clusterId = producer.metadata.cluster.clusterResource().clusterId() + try { + clusterId = producer.metadata.cluster.clusterResource().clusterId() + } catch (Exception e) { + // Log and continue - metadata might not be ready yet + } + attemptCount++ + } + + if (clusterId == null || clusterId.isEmpty()) { + throw new RuntimeException("Failed to obtain cluster ID after ${maxAttempts * 1.5} seconds") } } @@ -244,7 +256,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { then: // check that the message was received - def received = records.poll(5, TimeUnit.SECONDS) + def received = null + def maxPollingAttempts = 3 + def pollingTimeout = 10 // seconds + + for (int i = 0; i < maxPollingAttempts && received == null; i++) { + received = records.poll(pollingTimeout, TimeUnit.SECONDS) + if (received == null && i < maxPollingAttempts - 1) { + Thread.sleep(1000) // Brief pause between retries + } + } + + if (received == null) { + throw new AssertionError("No message received after ${maxPollingAttempts * pollingTimeout} seconds") + } + received.value() == greeting received.key() == null diff --git a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 9e71a218b7a..9ba2707c85a 100644 --- a/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -167,9 +167,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { TEST_WRITER.setFilter(dropEmptyKafkaPoll) KafkaProducer producer = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()) String clusterId = null - while (clusterId == null || clusterId.isEmpty()) { + def maxAttempts = 20 // 30 seconds total + def attemptCount = 0 + + while ((clusterId == null || clusterId.isEmpty()) && attemptCount < maxAttempts) { Thread.sleep(1500) - clusterId = producer.metadata.fetch().clusterResource().clusterId() + try { + clusterId = producer.metadata.fetch().clusterResource().clusterId() + } catch (Exception e) { + // Log and continue - metadata might not be ready yet + } + attemptCount++ + } + + if (clusterId == null || clusterId.isEmpty()) { + throw new RuntimeException("Failed to obtain cluster ID after ${maxAttempts * 1.5} seconds") } // set up the Kafka consumer properties @@ -224,7 +236,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { then: // // check that the message was received - def received = records.poll(10, TimeUnit.SECONDS) + def received = null + def maxPollingAttempts = 3 + def pollingTimeout = 10 // seconds + + for (int i = 0; i < maxPollingAttempts && received == null; i++) { + received = records.poll(pollingTimeout, TimeUnit.SECONDS) + if (received == null && i < maxPollingAttempts - 1) { + Thread.sleep(1000) // Brief pause between retries + } + } + + if (received == null) { + throw new AssertionError("No message received after ${maxPollingAttempts * pollingTimeout} seconds") + } + received.value() == greeting received.key() == null int nTraces = isDataStreamsEnabled() ? 3 : 2