From 2f1f61e596513cba72e904a267e3c1b3f166c561 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Jan 2020 09:14:01 -0500 Subject: [PATCH 1/4] Do not force refresh when write indexing buffer --- .../index/engine/InternalEngine.java | 4 +- .../indices/IndexingMemoryControllerIT.java | 163 ++++++++++++++ .../IndexingMemoryControllerTests.java | 210 +++++++----------- 3 files changed, 243 insertions(+), 134 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0ea50322b173b..abdead30ff87a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1612,9 +1612,7 @@ final boolean refresh(String source, SearcherScope scope, boolean block) throws @Override public void writeIndexingBuffer() throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are writing - // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) - refresh("write indexing buffer", SearcherScope.INTERNAL, true); + refresh("write indexing buffer", SearcherScope.INTERNAL, false); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java new file mode 100644 index 0000000000000..e56876d2ffcd9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices; + +import org.apache.lucene.index.DirectoryReader; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardIT; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.Scheduler.Cancellable; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; + +public class IndexingMemoryControllerIT extends ESSingleNodeTestCase { + + // #10312 + public void testDeletesAloneCanTriggerRefresh() throws Exception { + createIndex("index", + Settings.builder().put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) + .build()); + ensureGreen(); + + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService(resolveIndex("index")); + IndexShard shard = indexService.getShardOrNull(0); + assertNotNull(shard); + + for (int i = 0; i < 100; i++) { + String id = Integer.toString(i); + client().prepareIndex("index").setId(id).setSource("field", "value").get(); + } + + // Force merge so we know all merges are done before we start deleting: + ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); + assertNoFailures(r); + + // Make a shell of an IMC to check up on indexing buffer usage: + Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "1kb").build(); + + // TODO: would be cleaner if I could pass this 1kb setting to the single node this test created.... + IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) { + @Override + protected List availableShards() { + return Collections.singletonList(shard); + } + + @Override + protected long getIndexBufferRAMBytesUsed(IndexShard shard) { + return shard.getIndexBufferRAMBytesUsed(); + } + + @Override + protected void writeIndexingBufferAsync(IndexShard shard) { + // just do it sync'd for this test + shard.writeIndexingBuffer(); + } + + @Override + protected Cancellable scheduleTask(ThreadPool threadPool) { + return null; + } + }; + + for (int i = 0; i < 100; i++) { + String id = Integer.toString(i); + client().prepareDelete("index", id).get(); + } + + final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed(); + + imc.forceCheck(); + + // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool: + assertBusy(() -> { + try (Engine.Searcher s2 = shard.acquireSearcher("index")) { + // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: + final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); + assertTrue(indexingBufferBytes2 < indexingBufferBytes1); + } + }); + } + + public void testTranslogRecoveryWorksWithIMC() throws IOException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexService(resolveIndex("test")); + IndexShard shard = indexService.getShardOrNull(0); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test").setId(Integer.toString(i)).setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + } + + CheckedFunction wrapper = directoryReader -> directoryReader; + shard.close("simon says", false); + AtomicReference shardRef = new AtomicReference<>(); + Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); + Iterable iterable = () -> (shardRef.get() == null) ? Collections.emptyList().iterator() + : Collections.singleton(shardRef.get()).iterator(); + AtomicInteger flushes = new AtomicInteger(); + IndexingMemoryController imc = new IndexingMemoryController(settings, client().threadPool(), iterable) { + @Override + protected void writeIndexingBufferAsync(IndexShard shard) { + assertEquals(shard, shardRef.get()); + flushes.incrementAndGet(); + shard.writeIndexingBuffer(); + } + }; + final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, new NoneCircuitBreakerService(), imc); + shardRef.set(newShard); + try { + assertEquals(0, imc.availableShards().size()); + ShardRouting routing = newShard.routingEntry(); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + + assertEquals(1, imc.availableShards().size()); + assertTrue(IndexShardTestCase.recoverFromStore(newShard)); + assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); + IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); + } finally { + newShard.close("simon says", false); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 3609ef0aad0f7..9a5777d908841 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -18,26 +18,20 @@ */ package org.elasticsearch.indices; -import org.apache.lucene.index.DirectoryReader; -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.CheckedFunction; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardIT; import org.elasticsearch.index.shard.IndexShardTestCase; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import java.io.IOException; import java.util.ArrayList; @@ -47,15 +41,11 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.CountDownLatch; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; -public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { +public class IndexingMemoryControllerTests extends IndexShardTestCase { static class MockController extends IndexingMemoryController { @@ -168,19 +158,16 @@ protected Cancellable scheduleTask(ThreadPool threadPool) { } } - public void testShardAdditionAndRemoval() { - createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build()); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); + public void testShardAdditionAndRemoval() throws IOException { MockController controller = new MockController(Settings.builder() .put("indices.memory.index_buffer_size", "4mb").build()); - IndexShard shard0 = test.getShard(0); + IndexShard shard0 = newStartedShard(); controller.simulateIndexing(shard0); controller.assertBuffer(shard0, 1); // add another shard - IndexShard shard1 = test.getShard(1); + IndexShard shard1 = newStartedShard(); controller.simulateIndexing(shard1); controller.assertBuffer(shard0, 1); controller.assertBuffer(shard1, 1); @@ -195,24 +182,21 @@ public void testShardAdditionAndRemoval() { controller.forceCheck(); // add a new one - IndexShard shard2 = test.getShard(2); + IndexShard shard2 = newStartedShard(); controller.simulateIndexing(shard2); controller.assertBuffer(shard2, 1); + closeShards(shard0, shard1, shard2); } - public void testActiveInactive() { - - createIndex("test", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); + public void testActiveInactive() throws IOException { MockController controller = new MockController(Settings.builder() .put("indices.memory.index_buffer_size", "5mb") .build()); - IndexShard shard0 = test.getShard(0); + IndexShard shard0 = newStartedShard(); controller.simulateIndexing(shard0); - IndexShard shard1 = test.getShard(1); + IndexShard shard1 = newStartedShard(); controller.simulateIndexing(shard1); controller.assertBuffer(shard0, 1); @@ -237,6 +221,7 @@ public void testActiveInactive() { controller.simulateIndexing(shard1); // shard1 crossed 5 mb and is now cleared: controller.assertBuffer(shard1, 0); + closeShards(shard0, shard1); } public void testMinBufferSizes() { @@ -288,14 +273,11 @@ public void testMaxBufferSizes() { } public void testThrottling() throws Exception { - createIndex("test", Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 0).build()); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService test = indicesService.indexService(resolveIndex("test")); MockController controller = new MockController(Settings.builder() .put("indices.memory.index_buffer_size", "4mb").build()); - IndexShard shard0 = test.getShard(0); - IndexShard shard1 = test.getShard(1); + IndexShard shard0 = newStartedShard(); + IndexShard shard1 = newStartedShard(); controller.simulateIndexing(shard0); controller.simulateIndexing(shard0); controller.simulateIndexing(shard0); @@ -346,117 +328,83 @@ public void testThrottling() throws Exception { controller.forceCheck(); controller.assertNotThrottled(shard0); controller.assertNotThrottled(shard1); + closeShards(shard0, shard1); } - // #10312 - public void testDeletesAloneCanTriggerRefresh() throws Exception { - createIndex("index", - Settings.builder().put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - .build()); - ensureGreen(); - - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("index")); - IndexShard shard = indexService.getShardOrNull(0); - assertNotNull(shard); - - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - client().prepareIndex("index").setId(id).setSource("field", "value").get(); - } - - // Force merge so we know all merges are done before we start deleting: - ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); - assertNoFailures(r); - - // Make a shell of an IMC to check up on indexing buffer usage: - Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "1kb").build(); + EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.RefreshListener listener) { + final List internalRefreshListener = new ArrayList<>(config.getInternalRefreshListener());; + internalRefreshListener.add(listener); + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + } - // TODO: would be cleaner if I could pass this 1kb setting to the single node this test created.... - IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) { + public void testSkipRefreshIfShardIsRefreshingAlready() throws Exception { + SetOnce refreshLatch = new SetOnce<>(); + ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { @Override - protected List availableShards() { - return Collections.singletonList(shard); + public void beforeRefresh() { + if (refreshLatch.get() != null) { + try { + refreshLatch.get().await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } } @Override - protected long getIndexBufferRAMBytesUsed(IndexShard shard) { - return shard.getIndexBufferRAMBytesUsed(); - } + public void afterRefresh(boolean didRefresh) { + } + }; + IndexShard shard = newStartedShard(randomBoolean(), Settings.EMPTY, + config -> new InternalEngine(configWithRefreshListener(config, refreshListener))); + final RefreshStats refreshStats = shard.refreshStats(); + final IndexingMemoryController controller = new IndexingMemoryController( + Settings.builder().put("indices.memory.interval", "200h") // disable it + .put("indices.memory.index_buffer_size", "1024b").build(), + threadPool, + Collections.singleton(shard)) { @Override - protected void writeIndexingBufferAsync(IndexShard shard) { - // just do it sync'd for this test - shard.writeIndexingBuffer(); + protected long getIndexBufferRAMBytesUsed(IndexShard shard) { + return randomLongBetween(1025, 10 * 1024 * 1024); } @Override - protected Cancellable scheduleTask(ThreadPool threadPool) { - return null; + protected long getShardWritingBytes(IndexShard shard) { + return 0L; } }; - - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - client().prepareDelete("index", id).get(); + refreshLatch.set(new CountDownLatch(1)); // block refresh + int iterations = randomIntBetween(10, 100); + for (int i = 0; i < iterations; i++) { + controller.forceCheck(); } - - final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed(); - - imc.forceCheck(); - - // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool: assertBusy(() -> { - try (Engine.Searcher s2 = shard.acquireSearcher("index")) { - // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: - final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); - assertTrue(indexingBufferBytes2 < indexingBufferBytes1); + for (ThreadPoolStats.Stats stats : threadPool.stats()) { + if (stats.getName().equals(ThreadPool.Names.REFRESH)) { + assertThat(stats.getRejected(), equalTo(0L)); + assertThat(stats.getQueue(), equalTo(0)); + assertThat(stats.getActive(), equalTo(1)); + } } }); - } - - public void testTranslogRecoveryWorksWithIMC() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); - for (int i = 0; i < 100; i++) { - client().prepareIndex("test").setId(Integer.toString(i)).setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); - } - - CheckedFunction wrapper = directoryReader -> directoryReader; - shard.close("simon says", false); - AtomicReference shardRef = new AtomicReference<>(); - Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); - Iterable iterable = () -> (shardRef.get() == null) ? Collections.emptyList().iterator() - : Collections.singleton(shardRef.get()).iterator(); - AtomicInteger flushes = new AtomicInteger(); - IndexingMemoryController imc = new IndexingMemoryController(settings, client().threadPool(), iterable) { - @Override - protected void writeIndexingBufferAsync(IndexShard shard) { - assertEquals(shard, shardRef.get()); - flushes.incrementAndGet(); - shard.writeIndexingBuffer(); + refreshLatch.get().countDown(); // allow refresh + assertBusy(() -> { + for (ThreadPoolStats.Stats stats : threadPool.stats()) { + if (stats.getName().equals(ThreadPool.Names.REFRESH)) { + assertThat(stats.getActive(), equalTo(0)); + assertThat(stats.getQueue(), equalTo(0)); + assertThat(stats.getRejected(), equalTo(0L)); + } } - }; - final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, new NoneCircuitBreakerService(), imc); - shardRef.set(newShard); - try { - assertEquals(0, imc.availableShards().size()); - ShardRouting routing = newShard.routingEntry(); - DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - - assertEquals(1, imc.availableShards().size()); - assertTrue(IndexShardTestCase.recoverFromStore(newShard)); - assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); - IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); - } finally { - newShard.close("simon says", false); - } + }); + assertThat(shard.refreshStats().getTotal(), equalTo(refreshStats.getTotal() + 1)); + closeShards(shard); } - } From e6839093339bc1b9ca15b174adec03486d3c1ac2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Jan 2020 21:50:55 -0500 Subject: [PATCH 2/4] do not check for rejected --- .../elasticsearch/indices/IndexingMemoryControllerTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 9a5777d908841..052f1453e033f 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -364,6 +364,7 @@ public void afterRefresh(boolean didRefresh) { }; IndexShard shard = newStartedShard(randomBoolean(), Settings.EMPTY, config -> new InternalEngine(configWithRefreshListener(config, refreshListener))); + refreshLatch.set(new CountDownLatch(1)); // block refresh final RefreshStats refreshStats = shard.refreshStats(); final IndexingMemoryController controller = new IndexingMemoryController( Settings.builder().put("indices.memory.interval", "200h") // disable it @@ -380,7 +381,6 @@ protected long getShardWritingBytes(IndexShard shard) { return 0L; } }; - refreshLatch.set(new CountDownLatch(1)); // block refresh int iterations = randomIntBetween(10, 100); for (int i = 0; i < iterations; i++) { controller.forceCheck(); @@ -388,7 +388,6 @@ protected long getShardWritingBytes(IndexShard shard) { assertBusy(() -> { for (ThreadPoolStats.Stats stats : threadPool.stats()) { if (stats.getName().equals(ThreadPool.Names.REFRESH)) { - assertThat(stats.getRejected(), equalTo(0L)); assertThat(stats.getQueue(), equalTo(0)); assertThat(stats.getActive(), equalTo(1)); } @@ -400,7 +399,6 @@ protected long getShardWritingBytes(IndexShard shard) { if (stats.getName().equals(ThreadPool.Names.REFRESH)) { assertThat(stats.getActive(), equalTo(0)); assertThat(stats.getQueue(), equalTo(0)); - assertThat(stats.getRejected(), equalTo(0L)); } } }); From 747551513e5712883168a48d4916d2ec6e4202e7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 9 Jan 2020 08:52:11 -0500 Subject: [PATCH 3/4] ensure refresh threadpool stats exist --- .../IndexingMemoryControllerTests.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 052f1453e033f..78ae5118435bf 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -343,6 +343,16 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } + ThreadPoolStats.Stats getRefreshThreadPoolStats() { + final ThreadPoolStats stats = threadPool.stats(); + for (ThreadPoolStats.Stats s : stats) { + if (s.getName().equals(ThreadPool.Names.REFRESH)) { + return s; + } + } + throw new AssertionError("refresh thread pool stats not found [" + stats + "]"); + } + public void testSkipRefreshIfShardIsRefreshingAlready() throws Exception { SetOnce refreshLatch = new SetOnce<>(); ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { @@ -386,21 +396,15 @@ protected long getShardWritingBytes(IndexShard shard) { controller.forceCheck(); } assertBusy(() -> { - for (ThreadPoolStats.Stats stats : threadPool.stats()) { - if (stats.getName().equals(ThreadPool.Names.REFRESH)) { - assertThat(stats.getQueue(), equalTo(0)); - assertThat(stats.getActive(), equalTo(1)); - } - } + ThreadPoolStats.Stats stats = getRefreshThreadPoolStats(); + assertThat(stats.getQueue(), equalTo(0)); + assertThat(stats.getActive(), equalTo(1)); }); refreshLatch.get().countDown(); // allow refresh assertBusy(() -> { - for (ThreadPoolStats.Stats stats : threadPool.stats()) { - if (stats.getName().equals(ThreadPool.Names.REFRESH)) { - assertThat(stats.getActive(), equalTo(0)); - assertThat(stats.getQueue(), equalTo(0)); - } - } + ThreadPoolStats.Stats stats = getRefreshThreadPoolStats(); + assertThat(stats.getQueue(), equalTo(0)); + assertThat(stats.getActive(), equalTo(0)); }); assertThat(shard.refreshStats().getTotal(), equalTo(refreshStats.getTotal() + 1)); closeShards(shard); From 4d04e97c214626fd184de151d6fe36949e0267cb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 9 Jan 2020 11:19:28 -0500 Subject: [PATCH 4/4] arrange tests --- .../indices/IndexingMemoryControllerIT.java | 176 ++++++------------ .../IndexingMemoryControllerTests.java | 40 ++++ 2 files changed, 97 insertions(+), 119 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index e56876d2ffcd9..5d5786e992d86 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -18,146 +18,84 @@ */ package org.elasticsearch.indices; -import org.apache.lucene.index.DirectoryReader; -import org.elasticsearch.Version; +import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardIT; -import org.elasticsearch.index.shard.IndexShardTestCase; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.Scheduler.Cancellable; -import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Optional; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.greaterThan; public class IndexingMemoryControllerIT extends ESSingleNodeTestCase { - // #10312 - public void testDeletesAloneCanTriggerRefresh() throws Exception { - createIndex("index", - Settings.builder().put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) - .build()); - ensureGreen(); + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()) + // small indexing buffer so that we can trigger refresh after buffering 100 deletes + .put("indices.memory.index_buffer_size", "1kb").build(); + } - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("index")); - IndexShard shard = indexService.getShardOrNull(0); - assertNotNull(shard); + @Override + protected Collection> getPlugins() { + final List> plugins = new ArrayList<>(super.getPlugins()); + plugins.add(TestEnginePlugin.class); + return plugins; + } - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - client().prepareIndex("index").setId(id).setSource("field", "value").get(); + public static class TestEnginePlugin extends Plugin implements EnginePlugin { + + EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { + // We need to set a larger buffer for the IndexWriter; otherwise, it will flush before the IndexingMemoryController. + Settings settings = Settings.builder().put(config.getIndexSettings().getSettings()) + .put("indices.memory.index_buffer_size", "10mb").build(); + IndexSettings indexSettings = new IndexSettings(config.getIndexSettings().getIndexMetaData(), settings); + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + indexSettings, config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, LogManager.getLogger(IndexingMemoryControllerIT.class)), + config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } - // Force merge so we know all merges are done before we start deleting: - ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); - assertNoFailures(r); - - // Make a shell of an IMC to check up on indexing buffer usage: - Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "1kb").build(); - - // TODO: would be cleaner if I could pass this 1kb setting to the single node this test created.... - IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) { - @Override - protected List availableShards() { - return Collections.singletonList(shard); - } - - @Override - protected long getIndexBufferRAMBytesUsed(IndexShard shard) { - return shard.getIndexBufferRAMBytesUsed(); - } - - @Override - protected void writeIndexingBufferAsync(IndexShard shard) { - // just do it sync'd for this test - shard.writeIndexingBuffer(); - } - - @Override - protected Cancellable scheduleTask(ThreadPool threadPool) { - return null; - } - }; - - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - client().prepareDelete("index", id).get(); + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(config -> new InternalEngine(engineConfigWithLargerIndexingMemory(config))); } - - final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed(); - - imc.forceCheck(); - - // We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool: - assertBusy(() -> { - try (Engine.Searcher s2 = shard.acquireSearcher("index")) { - // 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write: - final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed(); - assertTrue(indexingBufferBytes2 < indexingBufferBytes1); - } - }); } - public void testTranslogRecoveryWorksWithIMC() throws IOException { - createIndex("test"); - ensureGreen(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShardOrNull(0); + // #10312 + public void testDeletesAloneCanTriggerRefresh() throws Exception { + IndexService indexService = createIndex("index", Settings.builder().put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0).put("index.refresh_interval", -1).build()); + IndexShard shard = indexService.getShard(0); for (int i = 0; i < 100; i++) { - client().prepareIndex("test").setId(Integer.toString(i)).setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + client().prepareIndex("index").setId(Integer.toString(i)).setSource("field", "value").get(); } - - CheckedFunction wrapper = directoryReader -> directoryReader; - shard.close("simon says", false); - AtomicReference shardRef = new AtomicReference<>(); - Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); - Iterable iterable = () -> (shardRef.get() == null) ? Collections.emptyList().iterator() - : Collections.singleton(shardRef.get()).iterator(); - AtomicInteger flushes = new AtomicInteger(); - IndexingMemoryController imc = new IndexingMemoryController(settings, client().threadPool(), iterable) { - @Override - protected void writeIndexingBufferAsync(IndexShard shard) { - assertEquals(shard, shardRef.get()); - flushes.incrementAndGet(); - shard.writeIndexingBuffer(); - } - }; - final IndexShard newShard = IndexShardIT.newIndexShard(indexService, shard, wrapper, new NoneCircuitBreakerService(), imc); - shardRef.set(newShard); - try { - assertEquals(0, imc.availableShards().size()); - ShardRouting routing = newShard.routingEntry(); - DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - newShard.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - - assertEquals(1, imc.availableShards().size()); - assertTrue(IndexShardTestCase.recoverFromStore(newShard)); - assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); - IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); - } finally { - newShard.close("simon says", false); + // Force merge so we know all merges are done before we start deleting: + ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); + assertNoFailures(r); + final RefreshStats refreshStats = shard.refreshStats(); + for (int i = 0; i < 100; i++) { + client().prepareDelete("index", Integer.toString(i)).get(); } + // need to assert busily as IndexingMemoryController refreshes in background + assertBusy(() -> assertThat(shard.refreshStats().getTotal(), greaterThan(refreshStats.getTotal() + 1))); } - } diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 78ae5118435bf..5a4fc6e07a873 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -20,15 +20,19 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; @@ -42,8 +46,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class IndexingMemoryControllerTests extends IndexShardTestCase { @@ -331,6 +340,37 @@ public void testThrottling() throws Exception { closeShards(shard0, shard1); } + public void testTranslogRecoveryWorksWithIMC() throws IOException { + IndexShard shard = newStartedShard(true); + for (int i = 0; i < 100; i++) { + indexDoc(shard, Integer.toString(i), "{\"foo\" : \"bar\"}", XContentType.JSON, null); + } + shard.close("simon says", false); + AtomicReference shardRef = new AtomicReference<>(); + Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); + Iterable iterable = () -> (shardRef.get() == null) ? Collections.emptyIterator() + : Collections.singleton(shardRef.get()).iterator(); + AtomicInteger flushes = new AtomicInteger(); + IndexingMemoryController imc = new IndexingMemoryController(settings, threadPool, iterable) { + @Override + protected void writeIndexingBufferAsync(IndexShard shard) { + assertEquals(shard, shardRef.get()); + flushes.incrementAndGet(); + shard.writeIndexingBuffer(); + } + }; + shard = reinitShard(shard, imc); + shardRef.set(shard); + assertEquals(0, imc.availableShards().size()); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + shard.markAsRecovering("store", new RecoveryState(shard.routingEntry(), localNode, null)); + + assertEquals(1, imc.availableShards().size()); + assertTrue(recoverFromStore(shard)); + assertThat("we should have flushed in IMC at least once", flushes.get(), greaterThanOrEqualTo(1)); + closeShards(shard); + } + EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.RefreshListener listener) { final List internalRefreshListener = new ArrayList<>(config.getInternalRefreshListener());; internalRefreshListener.add(listener);