From 36ec38d23c3e6221b9d36e1ca4559a72e90c1194 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 16 Apr 2018 12:14:32 -0400 Subject: [PATCH 1/3] Remove PipelineExecutionService#executeIndexRequest With the move long ago to execute all single-document indexing requests as bulk indexing request, the method PipelineExecutionService#executeIndexRequest is unused and will never be used in production code. This commit removes this method and cuts over all tests to use PipelineExecutionService#executeBulkRequest. --- .../ingest/PipelineExecutionService.java | 17 -- .../ingest/PipelineExecutionServiceTests.java | 285 ++++++++++-------- 2 files changed, 151 insertions(+), 151 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 43f80f5ae711a..39e60b5812eaf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -53,23 +53,6 @@ public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { this.threadPool = threadPool; } - public void executeIndexRequest(IndexRequest request, Consumer failureHandler, Consumer completionHandler) { - Pipeline pipeline = getPipeline(request.getPipeline()); - threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() { - - @Override - public void onFailure(Exception e) { - failureHandler.accept(e); - } - - @Override - protected void doRun() throws Exception { - innerExecute(request, pipeline); - completionHandler.accept(true); - } - }); - } - public void executeBulkRequest(Iterable actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler) { diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 59f447402fb88..66c1736f06896 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; @@ -48,9 +49,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -78,19 +80,23 @@ public void setup() { } public void testExecuteIndexPipelineDoesNotExist() { - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - try { - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + + final SetOnce failure = new SetOnce<>(); + final BiConsumer failureHandler = (request, e) -> { + failure.set(true); + assertThat(request, sameInstance(indexRequest)); + assertThat(e, instanceOf(IllegalArgumentException.class)); assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); - } - verify(failureHandler, never()).accept(any(Exception.class)); - verify(completionHandler, never()).accept(anyBoolean()); + }; + + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + + assertTrue(failure.get()); + verify(completionHandler, times(1)).accept(null); } public void testExecuteIndexPipelineExistsButFailedParsing() { @@ -106,17 +112,23 @@ public String getType() { return null; } }))); - SetOnce failed = new SetOnce<>(); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - Consumer failureHandler = (e) -> { - assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class)); - assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class)); + + final SetOnce failure = new SetOnce<>(); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + final BiConsumer failureHandler = (request, e) -> { + assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getCause().getMessage(), equalTo("error")); - failed.set(true); + failure.set(true); }; - Consumer completionHandler = (e) -> failed.set(false); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - assertTrue(failed.get()); + + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + + assertTrue(failure.get()); + verify(completionHandler, times(1)).accept(null); } public void testExecuteBulkPipelineDoesNotExist() { @@ -152,41 +164,40 @@ protected boolean matchesSafely(IllegalArgumentException iae) { verify(completionHandler, times(1)).accept(null); } - public void testExecuteSuccess() throws Exception { - CompoundProcessor processor = mock(CompoundProcessor.class); + public void testExecuteSuccess() { + final CompoundProcessor processor = mock(CompoundProcessor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(failureHandler, never()).accept(any()); - verify(completionHandler, times(1)).accept(true); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); } public void testExecuteEmptyPipeline() throws Exception { - CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mock(CompoundProcessor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); when(processor.getProcessors()).thenReturn(Collections.emptyList()); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); verify(processor, never()).execute(any()); - verify(failureHandler, never()).accept(any()); - verify(completionHandler, times(1)).accept(true); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); } public void testExecutePropagateAllMetaDataUpdates() throws Exception { - CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - long newVersion = randomLong(); - String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + final long newVersion = randomLong(); + final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); doAnswer((InvocationOnMock invocationOnMock) -> { IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { @@ -202,15 +213,15 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { }).when(processor).execute(any()); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); verify(processor).execute(any()); - verify(failureHandler, never()).accept(any()); - verify(completionHandler, times(1)).accept(true); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.id(), equalTo("update_id")); @@ -220,89 +231,91 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { } public void testExecuteFailure() throws Exception { - CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(any(RuntimeException.class)); - verify(completionHandler, never()).accept(anyBoolean()); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); } public void testExecuteSuccessWithOnFailure() throws Exception { - Processor processor = mock(Processor.class); + final Processor processor = mock(Processor.class); when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getTag()).thenReturn("mock_processor_tag"); - Processor onFailureProcessor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + final Processor onFailureProcessor = mock(Processor.class); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id") - .source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(Collections.emptyMap())); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(failureHandler, never()).accept(any(ElasticsearchException.class)); - verify(completionHandler, times(1)).accept(true); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); + verify(completionHandler, times(1)).accept(null); } public void testExecuteFailureWithOnFailure() throws Exception { - Processor processor = mock(Processor.class); - Processor onFailureProcessor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + final Processor processor = mock(Processor.class); + final Processor onFailureProcessor = mock(Processor.class); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(any(RuntimeException.class)); - verify(completionHandler, never()).accept(anyBoolean()); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); } public void testExecuteFailureWithNestedOnFailure() throws Exception { - Processor processor = mock(Processor.class); - Processor onFailureProcessor = mock(Processor.class); - Processor onFailureOnFailureProcessor = mock(Processor.class); - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), - Collections.singletonList(new CompoundProcessor(false, Collections.singletonList(onFailureProcessor), - Collections.singletonList(onFailureOnFailureProcessor)))); + final Processor processor = mock(Processor.class); + final Processor onFailureProcessor = mock(Processor.class); + final Processor onFailureOnFailureProcessor = mock(Processor.class); + final List processors = Collections.singletonList(onFailureProcessor); + final List onFailureProcessors = Collections.singletonList(onFailureOnFailureProcessor); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(any(RuntimeException.class)); - verify(completionHandler, never()).accept(anyBoolean()); + final Consumer completionHandler = mock(Consumer.class); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId( + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); } public void testBulkRequestExecutionWithFailures() throws Exception { @@ -344,7 +357,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { verify(completionHandler, times(1)).accept(null); } - public void testBulkRequestExecution() throws Exception { + public void testBulkRequestExecution() { BulkRequest bulkRequest = new BulkRequest(); String pipelineId = "_id"; @@ -367,47 +380,51 @@ public void testBulkRequestExecution() throws Exception { verify(completionHandler, times(1)).accept(null); } - public void testStats() throws Exception { - IngestStats ingestStats = executionService.stats(); - assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(ingestStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + public void testStats() { + { + final IngestStats initialStats = executionService.stats(); + assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); + assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + } when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class)))); when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class)))); - Map configurationMap = new HashMap<>(); + final Map configurationMap = new HashMap<>(); configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); executionService.updatePipelineStats(new IngestMetadata(configurationMap)); @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); + final Consumer completionHandler = mock(Consumer.class); - IndexRequest indexRequest = new IndexRequest("_index"); + final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - ingestStats = executionService.stats(); - assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(ingestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(ingestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(1L)); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + { + final IngestStats afterFirstRequestStats = executionService.stats(); + assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); + assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); + } indexRequest.setPipeline("_id2"); - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - ingestStats = executionService.stats(); - assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(ingestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(ingestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(2L)); + executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + final IngestStats afterSecondRequestStats = executionService.stats(); + assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); } // issue: https://github.com/elastic/elasticsearch/issues/18126 - public void testUpdatingStatsWhenRemovingPipelineWorks() throws Exception { + public void testUpdatingStatsWhenRemovingPipelineWorks() { Map configurationMap = new HashMap<>(); configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); @@ -422,12 +439,12 @@ public void testUpdatingStatsWhenRemovingPipelineWorks() throws Exception { assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); } - private IngestDocument eqID(String index, String type, String id, Map source) { - return argThat(new IngestDocumentMatcher(index, type, id, source)); + private IngestDocument eqIndexTypeId(final Map source) { + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); } - private IngestDocument eqID(String index, String type, String id, Long version, VersionType versionType, Map source) { - return argThat(new IngestDocumentMatcher(index, type, id, version, versionType, source)); + private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); } private class IngestDocumentMatcher extends ArgumentMatcher { From 9b902236c3f2b65269f2b2cb10cde26683728435 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 16 Apr 2018 12:19:12 -0400 Subject: [PATCH 2/3] Formatting --- .../ingest/PipelineExecutionServiceTests.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 66c1736f06896..b47a76d3ab828 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -235,15 +235,15 @@ public void testExecuteFailure() throws Exception { when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); } @@ -274,17 +274,18 @@ public void testExecuteFailureWithOnFailure() throws Exception { false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqIndexTypeId(indexRequest.version(), - indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()) + .when(onFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); } @@ -301,19 +302,21 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()) + .when(onFailureOnFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()) + .when(onFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId( - indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); } From 970b7f719d2dd27ba6f9039ce1f2218e7e54afa5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 16 Apr 2018 12:20:08 -0400 Subject: [PATCH 3/3] Remove braces leftover from refactoring --- .../ingest/PipelineExecutionServiceTests.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index b47a76d3ab828..15a23421da26a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -384,14 +384,12 @@ public void testBulkRequestExecution() { } public void testStats() { - { - final IngestStats initialStats = executionService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); - } + final IngestStats initialStats = executionService.stats(); + assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); + assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class)))); when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class)))); @@ -409,13 +407,11 @@ public void testStats() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - { - final IngestStats afterFirstRequestStats = executionService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); - } + final IngestStats afterFirstRequestStats = executionService.stats(); + assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); + assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); indexRequest.setPipeline("_id2"); executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);