Skip to content

Commit 86b58d9

Browse files
authored
Rename AutoDetectResultProcessor* to AutodetectResultProcessor* for consistency with other classes where the spelling is "Autodetect" (#43359) (#43366)
1 parent 8578aba commit 86b58d9

File tree

6 files changed

+67
-67
lines changed

6 files changed

+67
-67
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
3434
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
3535
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
36-
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
36+
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
3737
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
3838
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
3939
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
@@ -66,7 +66,7 @@ public class AutodetectCommunicator implements Closeable {
6666
private final AutodetectProcess autodetectProcess;
6767
private final StateStreamer stateStreamer;
6868
private final DataCountsReporter dataCountsReporter;
69-
private final AutoDetectResultProcessor autoDetectResultProcessor;
69+
private final AutodetectResultProcessor autodetectResultProcessor;
7070
private final BiConsumer<Exception, Boolean> onFinishHandler;
7171
private final ExecutorService autodetectWorkerExecutor;
7272
private final NamedXContentRegistry xContentRegistry;
@@ -75,15 +75,15 @@ public class AutodetectCommunicator implements Closeable {
7575
private volatile boolean processKilled;
7676

7777
AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
78-
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor,
78+
DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor,
7979
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
8080
ExecutorService autodetectWorkerExecutor) {
8181
this.job = job;
8282
this.environment = environment;
8383
this.autodetectProcess = process;
8484
this.stateStreamer = stateStreamer;
8585
this.dataCountsReporter = dataCountsReporter;
86-
this.autoDetectResultProcessor = autoDetectResultProcessor;
86+
this.autodetectResultProcessor = autodetectResultProcessor;
8787
this.onFinishHandler = onFinishHandler;
8888
this.xContentRegistry = xContentRegistry;
8989
this.autodetectWorkerExecutor = autodetectWorkerExecutor;
@@ -120,7 +120,7 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
120120
}
121121

122122
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
123-
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
123+
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription());
124124

125125
if (includeTokensField && categorizationAnalyzer == null) {
126126
createCategorizationAnalyzer(analysisRegistry);
@@ -129,14 +129,14 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
129129
CountDownLatch latch = new CountDownLatch(1);
130130
AtomicReference<DataCounts> dataCountsAtomicReference = new AtomicReference<>();
131131
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
132-
autoDetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
132+
autodetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
133133
dataCountsAtomicReference.set(dataCounts);
134134
exceptionAtomicReference.set(e);
135135
latch.countDown();
136136
});
137137

138138
latch.await();
139-
autoDetectWriter.flushStream();
139+
autodetectWriter.flushStream();
140140

141141
if (exceptionAtomicReference.get() != null) {
142142
throw exceptionAtomicReference.get();
@@ -168,7 +168,7 @@ public void close(boolean restart, String reason) {
168168
killProcess(false, false);
169169
stateStreamer.cancel();
170170
}
171-
autoDetectResultProcessor.awaitCompletion();
171+
autodetectResultProcessor.awaitCompletion();
172172
} finally {
173173
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true);
174174
}
@@ -199,13 +199,13 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
199199
public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException {
200200
try {
201201
processKilled = true;
202-
autoDetectResultProcessor.setProcessKilled();
202+
autodetectResultProcessor.setProcessKilled();
203203
autodetectWorkerExecutor.shutdown();
204204
autodetectProcess.kill();
205205

206206
if (awaitCompletion) {
207207
try {
208-
autoDetectResultProcessor.awaitCompletion();
208+
autodetectResultProcessor.awaitCompletion();
209209
} catch (TimeoutException e) {
210210
LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e);
211211
}
@@ -289,20 +289,20 @@ FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedExc
289289

290290
FlushAcknowledgement flushAcknowledgement;
291291
try {
292-
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
292+
flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
293293
while (flushAcknowledgement == null) {
294294
checkProcessIsAlive();
295295
checkResultsProcessorIsAlive();
296-
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
296+
flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
297297
}
298298
} finally {
299-
autoDetectResultProcessor.clearAwaitingFlush(flushId);
299+
autodetectResultProcessor.clearAwaitingFlush(flushId);
300300
}
301301

302302
if (processKilled == false) {
303303
// We also have to wait for the normalizer to become idle so that we block
304304
// clients from querying results in the middle of normalization.
305-
autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
305+
autodetectResultProcessor.waitUntilRenormalizerIsIdle();
306306

307307
LOGGER.debug("[{}] Flush completed", job.getId());
308308
}
@@ -321,7 +321,7 @@ private void checkProcessIsAlive() {
321321
}
322322

323323
private void checkResultsProcessorIsAlive() {
324-
if (autoDetectResultProcessor.isFailed()) {
324+
if (autodetectResultProcessor.isFailed()) {
325325
// Don't log here - it just causes double logging when the exception gets logged
326326
throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId());
327327
}
@@ -332,11 +332,11 @@ public ZonedDateTime getProcessStartTime() {
332332
}
333333

334334
public ModelSizeStats getModelSizeStats() {
335-
return autoDetectResultProcessor.modelSizeStats();
335+
return autodetectResultProcessor.modelSizeStats();
336336
}
337337

338338
public TimingStats getTimingStats() {
339-
return autoDetectResultProcessor.timingStats();
339+
return autodetectResultProcessor.timingStats();
340340
}
341341

342342
public DataCounts getDataCounts() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
5858
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
5959
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
60-
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
60+
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
6161
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
6262
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
6363
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@@ -500,18 +500,18 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
500500
}
501501

502502
// A TP with no queue, so that we fail immediately if there are no threads available
503-
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
503+
ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
504504
DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister);
505505
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
506506
new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory);
507507
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
508508
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
509509
renormalizerExecutorService);
510510

511-
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
511+
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autodetectExecutorService,
512512
onProcessCrash(jobTask));
513-
AutoDetectResultProcessor processor =
514-
new AutoDetectResultProcessor(
513+
AutodetectResultProcessor processor =
514+
new AutodetectResultProcessor(
515515
client,
516516
auditor,
517517
jobId,
@@ -521,8 +521,8 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
521521
autodetectParams.timingStats());
522522
ExecutorService autodetectWorkerExecutor;
523523
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
524-
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
525-
autoDetectExecutorService.submit(() -> processor.process(process));
524+
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
525+
autodetectExecutorService.submit(() -> processor.process(process));
526526
} catch (EsRejectedExecutionException e) {
527527
// If submitting the operation to read the results from the process fails we need to close
528528
// the process too, so that other submitted operations to threadpool are stopped.
@@ -734,9 +734,9 @@ public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatis
734734
}
735735

736736
ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
737-
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
738-
executorService.submit(autoDetectWorkerExecutor::start);
739-
return autoDetectWorkerExecutor;
737+
AutodetectWorkerExecutorService autodetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
738+
executorService.submit(autodetectWorkerExecutor::start);
739+
return autodetectWorkerExecutor;
740740
}
741741

742742
public ByteSizeValue getMinLocalStorageAvailable() {
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@
6868
* interim results and the old interim results have to be cleared out
6969
* before the new ones are written.
7070
*/
71-
public class AutoDetectResultProcessor {
71+
public class AutodetectResultProcessor {
7272

73-
private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class);
73+
private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
7474

7575
private final Client client;
7676
private final Auditor auditor;
@@ -100,14 +100,14 @@ public class AutoDetectResultProcessor {
100100
*/
101101
private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile
102102

103-
public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
103+
public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
104104
JobResultsPersister persister,
105105
ModelSizeStats latestModelSizeStats,
106106
TimingStats timingStats) {
107107
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
108108
}
109109

110-
AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
110+
AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
111111
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
112112
FlushListener flushListener) {
113113
this.client = Objects.requireNonNull(client);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
4040
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
4141
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
42-
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
42+
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
4343
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
4444
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
4545
import org.elasticsearch.xpack.ml.job.results.BucketTests;
@@ -75,7 +75,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
7575

7676
private JobResultsProvider jobResultsProvider;
7777
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
78-
private AutoDetectResultProcessor resultProcessor;
78+
private AutodetectResultProcessor resultProcessor;
7979
private Renormalizer renormalizer;
8080

8181
@Override
@@ -91,7 +91,7 @@ public void createComponents() throws Exception {
9191
jobResultsProvider = new JobResultsProvider(client(), builder.build());
9292
renormalizer = mock(Renormalizer.class);
9393
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
94-
resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
94+
resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
9595
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
9696
@Override
9797
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {

0 commit comments

Comments
 (0)