From 2f19df6d355b0022b6111d3a1f946c3453ca4c82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Tue, 17 Dec 2019 17:30:17 +0100 Subject: [PATCH] Pass processConnectTimeout to the method that fetches C++ process' PID (#50276) --- .../process/AbstractNativeAnalyticsProcess.java | 6 ++++-- .../dataframe/process/NativeAnalyticsProcess.java | 5 +++-- .../process/NativeAnalyticsProcessFactory.java | 4 ++-- .../NativeMemoryUsageEstimationProcess.java | 5 +++-- ...NativeMemoryUsageEstimationProcessFactory.java | 3 ++- .../autodetect/NativeAutodetectProcess.java | 7 +++++-- .../NativeAutodetectProcessFactory.java | 2 +- .../normalizer/NativeNormalizerProcess.java | 6 ++++-- .../NativeNormalizerProcessFactory.java | 2 +- .../xpack/ml/process/AbstractNativeProcess.java | 15 ++++++++------- .../autodetect/NativeAutodetectProcessTests.java | 12 +++++++----- .../ml/process/AbstractNativeProcessTests.java | 3 ++- 12 files changed, 42 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java index 6ceaa4ce9cda4..e3d6be120c0ff 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AbstractNativeAnalyticsProcess.java @@ -14,6 +14,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -27,9 +28,10 @@ abstract class AbstractNativeAnalyticsProcess extends AbstractNativeProc protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser resultParser, String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, - List filesToDelete, Consumer onProcessCrash, + List filesToDelete, Consumer onProcessCrash, Duration processConnectTimeout, NamedXContentRegistry namedXContentRegistry) { - super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); + super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, + processConnectTimeout); this.name = Objects.requireNonNull(name); this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java index e606f533ce290..1036ee08fb1f0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java @@ -14,6 +14,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -26,10 +27,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess filesToDelete, - Consumer onProcessCrash, AnalyticsProcessConfig config, + Consumer onProcessCrash, Duration processConnectTimeout, AnalyticsProcessConfig config, NamedXContentRegistry namedXContentRegistry) { super(NAME, AnalyticsResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, - filesToDelete, onProcessCrash, namedXContentRegistry); + filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry); this.config = Objects.requireNonNull(config); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 1c1159b51825d..80e204f590712 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -84,8 +84,8 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), - processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig, - namedXContentRegistry); + processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, + analyticsProcessConfig, namedXContentRegistry); try { startProcess(config, executorService, processPipes, analyticsProcess); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java index 7244859cba59c..665f4d7e5ec8b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcess.java @@ -12,6 +12,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.function.Consumer; @@ -22,9 +23,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP protected NativeMemoryUsageEstimationProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - Consumer onProcessCrash) { + Consumer onProcessCrash, Duration processConnectTimeout) { super(NAME, MemoryUsageEstimationResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, - numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY); + numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 79c284bf944cb..e49244d55a354 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -75,7 +75,8 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess( null, 0, filesToDelete, - onProcessCrash); + onProcessCrash, + processConnectTimeout); try { process.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index be016c32f84ef..b05df4aff92db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Path; +import java.time.Duration; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; @@ -43,8 +44,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - ProcessResultsParser resultsParser, Consumer onProcessCrash) { - super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); + ProcessResultsParser resultsParser, Consumer onProcessCrash, + Duration processConnectTimeout) { + super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash, + processConnectTimeout); this.resultsParser = resultsParser; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 8daddfdd03d27..c979268d8f6e3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -91,7 +91,7 @@ public AutodetectProcess createAutodetectProcess(Job job, NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, - filesToDelete, resultsParser, onProcessCrash); + filesToDelete, resultsParser, onProcessCrash, processConnectTimeout); try { autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); return autodetect; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java index ec22d35f16872..117d3c60fece3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java @@ -10,6 +10,7 @@ import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.Collections; /** @@ -19,8 +20,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize private static final String NAME = "normalizer"; - NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) { - super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}); + NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, + Duration processConnectTimeout) { + super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 6900bcba1176a..7985799a696bd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -53,7 +53,7 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(), - processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get()); + processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout); try { normalizerProcess.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index e0117109691e8..62c060de03a50 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -51,6 +51,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private final int numberOfFields; private final List filesToDelete; private final Consumer onProcessCrash; + private final Duration processConnectTimeout; private volatile Future logTailFuture; private volatile Future stateProcessorFuture; private volatile boolean processCloseInitiated; @@ -59,17 +60,18 @@ public abstract class AbstractNativeProcess implements NativeProcess { protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, - Consumer onProcessCrash) { + Consumer onProcessCrash, Duration processConnectTimeout) { this.jobId = jobId; - cppLogHandler = new CppLogMessageHandler(jobId, logStream); + this.cppLogHandler = new CppLogMessageHandler(jobId, logStream); this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null; this.processOutStream = processOutStream; this.processRestoreStream = processRestoreStream; this.recordWriter = new LengthEncodedWriter(this.processInStream); - startTime = ZonedDateTime.now(); + this.startTime = ZonedDateTime.now(); this.numberOfFields = numberOfFields; this.filesToDelete = filesToDelete; this.onProcessCrash = Objects.requireNonNull(onProcessCrash); + this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout); } public abstract String getName(); @@ -195,10 +197,9 @@ public void kill() throws IOException { LOGGER.debug("[{}] Killing {} process", jobId, getName()); processKilled = true; try { - // The PID comes via the processes log stream. We don't wait for it to arrive here, - // but if the wait times out it implies the process has only just started, in which - // case it should die very quickly when we close its input stream. - NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO)); + // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID. + // Without the PID we cannot kill the process. + NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(processConnectTimeout)); // Wait for the process to die before closing processInStream as if the process // is still alive when processInStream is closed it may start persisting state diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 07a191a422372..adcf4720c78a9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.Collections; @@ -62,7 +63,7 @@ public void testProcessStartTime() throws Exception { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, mock(OutputStream.class), outputStream, mock(OutputStream.class), NUMBER_FIELDS, null, - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); @@ -85,7 +86,7 @@ public void testWriteRecord() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.writeRecord(record); @@ -120,7 +121,7 @@ public void testFlush() throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); FlushJobParams params = FlushJobParams.builder().build(); @@ -154,7 +155,8 @@ public void testConsumeAndCloseOutputStream() throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), + Duration.ZERO)) { process.consumeAndCloseOutputStream(); assertThat(processOutStream.available(), equalTo(0)); @@ -170,7 +172,7 @@ private void testWriteMessage(CheckedConsumer writeFunc ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), - new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { + new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); writeFunction.accept(process); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index 525a9d7183da0..e6454e264b11f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -140,7 +141,7 @@ public void testIsReady() throws Exception { private class TestNativeProcess extends AbstractNativeProcess { TestNativeProcess(OutputStream inputStream) { - super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash); + super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO); } @Override