Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class AutodetectCommunicator implements Closeable {
private final AutodetectProcess autodetectProcess;
private final StateStreamer stateStreamer;
private final DataCountsReporter dataCountsReporter;
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final AutodetectResultProcessor autodetectResultProcessor;
private final BiConsumer<Exception, Boolean> onFinishHandler;
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
Expand All @@ -75,15 +75,15 @@ public class AutodetectCommunicator implements Closeable {
private volatile boolean processKilled;

AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor,
DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor,
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.environment = environment;
this.autodetectProcess = process;
this.stateStreamer = stateStreamer;
this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
this.autodetectResultProcessor = autodetectResultProcessor;
this.onFinishHandler = onFinishHandler;
this.xContentRegistry = xContentRegistry;
this.autodetectWorkerExecutor = autodetectWorkerExecutor;
Expand Down Expand Up @@ -120,7 +120,7 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
}

CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription());

if (includeTokensField && categorizationAnalyzer == null) {
createCategorizationAnalyzer(analysisRegistry);
Expand All @@ -129,14 +129,14 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DataCounts> dataCountsAtomicReference = new AtomicReference<>();
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
autoDetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
autodetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
dataCountsAtomicReference.set(dataCounts);
exceptionAtomicReference.set(e);
latch.countDown();
});

latch.await();
autoDetectWriter.flushStream();
autodetectWriter.flushStream();

if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
Expand Down Expand Up @@ -168,7 +168,7 @@ public void close(boolean restart, String reason) {
killProcess(false, false);
stateStreamer.cancel();
}
autoDetectResultProcessor.awaitCompletion();
autodetectResultProcessor.awaitCompletion();
} finally {
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true);
}
Expand Down Expand Up @@ -199,13 +199,13 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException {
try {
processKilled = true;
autoDetectResultProcessor.setProcessKilled();
autodetectResultProcessor.setProcessKilled();
autodetectWorkerExecutor.shutdown();
autodetectProcess.kill();

if (awaitCompletion) {
try {
autoDetectResultProcessor.awaitCompletion();
autodetectResultProcessor.awaitCompletion();
} catch (TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e);
}
Expand Down Expand Up @@ -289,20 +289,20 @@ FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedExc

FlushAcknowledgement flushAcknowledgement;
try {
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
while (flushAcknowledgement == null) {
checkProcessIsAlive();
checkResultsProcessorIsAlive();
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
}
} finally {
autoDetectResultProcessor.clearAwaitingFlush(flushId);
autodetectResultProcessor.clearAwaitingFlush(flushId);
}

if (processKilled == false) {
// We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization.
autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
autodetectResultProcessor.waitUntilRenormalizerIsIdle();

LOGGER.debug("[{}] Flush completed", job.getId());
}
Expand All @@ -321,7 +321,7 @@ private void checkProcessIsAlive() {
}

private void checkResultsProcessorIsAlive() {
if (autoDetectResultProcessor.isFailed()) {
if (autodetectResultProcessor.isFailed()) {
// Don't log here - it just causes double logging when the exception gets logged
throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId());
}
Expand All @@ -332,11 +332,11 @@ public ZonedDateTime getProcessStartTime() {
}

public ModelSizeStats getModelSizeStats() {
return autoDetectResultProcessor.modelSizeStats();
return autodetectResultProcessor.modelSizeStats();
}

public TimingStats getTimingStats() {
return autoDetectResultProcessor.timingStats();
return autodetectResultProcessor.timingStats();
}

public DataCounts getDataCounts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
Expand Down Expand Up @@ -500,18 +500,18 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
}

// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService);

AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autodetectExecutorService,
onProcessCrash(jobTask));
AutoDetectResultProcessor processor =
new AutoDetectResultProcessor(
AutodetectResultProcessor processor =
new AutodetectResultProcessor(
client,
auditor,
jobId,
Expand All @@ -521,8 +521,8 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
autodetectParams.timingStats());
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process));
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autodetectExecutorService.submit(() -> processor.process(process));
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
Expand Down Expand Up @@ -734,9 +734,9 @@ public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatis
}

ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autoDetectWorkerExecutor::start);
return autoDetectWorkerExecutor;
AutodetectWorkerExecutorService autodetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autodetectWorkerExecutor::start);
return autodetectWorkerExecutor;
}

public ByteSizeValue getMinLocalStorageAvailable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
* interim results and the old interim results have to be cleared out
* before the new ones are written.
*/
public class AutoDetectResultProcessor {
public class AutodetectResultProcessor {

private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class);
private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);

private final Client client;
private final Auditor auditor;
Expand Down Expand Up @@ -100,14 +100,14 @@ public class AutoDetectResultProcessor {
*/
private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile

public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister,
ModelSizeStats latestModelSizeStats,
TimingStats timingStats) {
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
}

AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
FlushListener flushListener) {
this.client = Objects.requireNonNull(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.BucketTests;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {

private JobResultsProvider jobResultsProvider;
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutoDetectResultProcessor resultProcessor;
private AutodetectResultProcessor resultProcessor;
private Renormalizer renormalizer;

@Override
Expand All @@ -91,7 +91,7 @@ public void createComponents() throws Exception {
jobResultsProvider = new JobResultsProvider(client(), builder.build());
renormalizer = mock(Renormalizer.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
@Override
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
Expand Down
Loading