Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import org.gradle.api.tasks.options.Option;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class RunTask extends DefaultTestClustersTask {
Expand Down Expand Up @@ -66,17 +67,54 @@ public void beforeStart() {

@TaskAction
public void runAndWait() throws IOException {
Set<BufferedReader> toRead = new HashSet<>();
for (ElasticsearchCluster cluster : getClusters()) {
for (ElasticsearchNode node : cluster.getNodes()) {
toRead.add(Files.newBufferedReader(node.getEsStdoutFile()));
List<BufferedReader> toRead = new ArrayList<>();
try {
for (ElasticsearchCluster cluster : getClusters()) {
for (ElasticsearchNode node : cluster.getNodes()) {
BufferedReader reader = Files.newBufferedReader(node.getEsStdoutFile());
toRead.add(reader);
}
}
}
while (Thread.currentThread().isInterrupted() == false) {
for (BufferedReader bufferedReader : toRead) {
if (bufferedReader.ready()) {
logger.lifecycle(bufferedReader.readLine());

while (Thread.currentThread().isInterrupted() == false) {
boolean readData = false;
for (BufferedReader bufferedReader : toRead) {
if (bufferedReader.ready()) {
readData = true;
logger.lifecycle(bufferedReader.readLine());
}
}

if (readData == false) {
// no data was ready to be consumed and rather than continuously spinning, pause
// for some time to avoid excessive CPU usage. Ideally we would use the JDK
// WatchService to receive change notifications but the WatchService does not have
// a native MacOS implementation and instead relies upon polling with possible
// delays up to 10s before a notification is received. See JDK-7133447.
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
} finally {
Exception thrown = null;
for (Closeable closeable : toRead) {
try {
closeable.close();
} catch (Exception e) {
if (thrown == null) {
thrown = e;
} else {
thrown.addSuppressed(e);
}
}
}

if (thrown != null) {
logger.debug("exception occurred during close of stdout file readers", thrown);
}
}
}
Expand Down