Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
TEST_WRITER.setFilter(dropEmptyKafkaPoll)
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer())
String clusterId = null
while (clusterId == null || clusterId.isEmpty()) {
def maxAttempts = 20 // 30 seconds total
def attemptCount = 0

while ((clusterId == null || clusterId.isEmpty()) && attemptCount < maxAttempts) {
Thread.sleep(1500)
clusterId = producer.metadata.fetch().clusterResource().clusterId()
try {
clusterId = producer.metadata.fetch().clusterResource().clusterId()
} catch (Exception e) {
// Log and continue - metadata might not be ready yet
}
attemptCount++
}

if (clusterId == null || clusterId.isEmpty()) {
throw new RuntimeException("Failed to obtain cluster ID after ${maxAttempts * 1.5} seconds")
}

// set up the Kafka consumer properties
Expand Down Expand Up @@ -194,7 +206,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
def received = null
def maxPollingAttempts = 3
def pollingTimeout = 10 // seconds

for (int i = 0; i < maxPollingAttempts && received == null; i++) {
received = records.poll(pollingTimeout, TimeUnit.SECONDS)
if (received == null && i < maxPollingAttempts - 1) {
Thread.sleep(1000) // Brief pause between retries
}
}

if (received == null) {
throw new AssertionError("No message received after ${maxPollingAttempts * pollingTimeout} seconds")
}

received.value() == greeting
received.key() == null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
if (isDataStreamsEnabled()) {
producer.flush()
clusterId = producer.metadata.cluster.clusterResource().clusterId()
while (clusterId == null || clusterId.isEmpty()) {
def maxAttempts = 20 // 30 seconds total
def attemptCount = 0

while ((clusterId == null || clusterId.isEmpty()) && attemptCount < maxAttempts) {
Thread.sleep(1500)
clusterId = producer.metadata.cluster.clusterResource().clusterId()
try {
clusterId = producer.metadata.cluster.clusterResource().clusterId()
} catch (Exception e) {
// Log and continue - metadata might not be ready yet
}
attemptCount++
}

if (clusterId == null || clusterId.isEmpty()) {
throw new RuntimeException("Failed to obtain cluster ID after ${maxAttempts * 1.5} seconds")
}
}

Expand Down Expand Up @@ -244,7 +256,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
def received = null
def maxPollingAttempts = 3
def pollingTimeout = 10 // seconds

for (int i = 0; i < maxPollingAttempts && received == null; i++) {
received = records.poll(pollingTimeout, TimeUnit.SECONDS)
if (received == null && i < maxPollingAttempts - 1) {
Thread.sleep(1000) // Brief pause between retries
}
}

if (received == null) {
throw new AssertionError("No message received after ${maxPollingAttempts * pollingTimeout} seconds")
}

received.value() == greeting
received.key() == null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
TEST_WRITER.setFilter(dropEmptyKafkaPoll)
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer())
String clusterId = null
while (clusterId == null || clusterId.isEmpty()) {
def maxAttempts = 20 // 30 seconds total
def attemptCount = 0

while ((clusterId == null || clusterId.isEmpty()) && attemptCount < maxAttempts) {
Thread.sleep(1500)
clusterId = producer.metadata.fetch().clusterResource().clusterId()
try {
clusterId = producer.metadata.fetch().clusterResource().clusterId()
} catch (Exception e) {
// Log and continue - metadata might not be ready yet
}
attemptCount++
}

if (clusterId == null || clusterId.isEmpty()) {
throw new RuntimeException("Failed to obtain cluster ID after ${maxAttempts * 1.5} seconds")
}

// set up the Kafka consumer properties
Expand Down Expand Up @@ -224,7 +236,21 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

then:
// // check that the message was received
def received = records.poll(10, TimeUnit.SECONDS)
def received = null
def maxPollingAttempts = 3
def pollingTimeout = 10 // seconds

for (int i = 0; i < maxPollingAttempts && received == null; i++) {
received = records.poll(pollingTimeout, TimeUnit.SECONDS)
if (received == null && i < maxPollingAttempts - 1) {
Thread.sleep(1000) // Brief pause between retries
}
}

if (received == null) {
throw new AssertionError("No message received after ${maxPollingAttempts * pollingTimeout} seconds")
}

received.value() == greeting
received.key() == null
int nTraces = isDataStreamsEnabled() ? 3 : 2
Expand Down
Loading