From aa074ed3f0aa9ea6cc0a7dc72c0f36db3cb346cd Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 23 May 2018 17:34:22 +0200 Subject: [PATCH 1/2] Move caching of the store to IndexShard. In spite of the existing caching, I have seen a number of nodes hot threads where one thread had been spending all its cpu on computing the size of a directory. I am proposing to move the caching of the store size to `IndexShard` so that it has access to the existing logic regarding whether a shard is active or not in order to be able to cache the store size more agressively. The tricky bit is that an inactive shard might still be merged, which may have a significant impact on the store size. This should be especially useful for time-based data since most indices are typically inactive. --- .../common/settings/IndexScopedSettings.java | 5 +- .../common/util/SingleObjectCache.java | 7 ++ .../org/elasticsearch/index/IndexService.java | 8 +- ...ElasticsearchConcurrentMergeScheduler.java | 28 ++++-- .../elasticsearch/index/engine/Engine.java | 9 ++ .../index/engine/InternalEngine.java | 12 ++- .../elasticsearch/index/shard/IndexShard.java | 86 ++++++++++++++++++- .../org/elasticsearch/index/store/Store.java | 63 ++++---------- .../cluster/ClusterInfoServiceIT.java | 2 +- .../elasticsearch/index/store/StoreTests.java | 11 +-- .../indices/recovery/IndexRecoveryIT.java | 4 +- 11 files changed, 164 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index debd0f59a2ea2..921ed1a6e7e43 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -36,14 +36,13 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.FsDirectoryService; -import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesRequestCache; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -142,7 +141,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, - Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING, + IndexShard.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING, MapperService.INDEX_MAPPER_DYNAMIC_SETTING, MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/util/SingleObjectCache.java b/server/src/main/java/org/elasticsearch/common/util/SingleObjectCache.java index f3d710dab8c79..63e3b38d0bbdf 100644 --- a/server/src/main/java/org/elasticsearch/common/util/SingleObjectCache.java +++ b/server/src/main/java/org/elasticsearch/common/util/SingleObjectCache.java @@ -64,6 +64,13 @@ public T getOrRefresh() { return cached; } + /** + * Return the (potentially stale) cache entry. + */ + protected final T getCached() { + return cached; + } + /** * Returns a new instance to cache */ diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 585406d01a6f6..891caefb224a6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -301,8 +302,11 @@ private long getAvgShardSizeInBytes() throws IOException { long sum = 0; int count = 0; for (IndexShard indexShard : this) { - sum += indexShard.store().stats().sizeInBytes(); - count++; + StoreStats storeStats = indexShard.storeStats(); + if (storeStats != null) { + sum += storeStats.sizeInBytes(); + count++; + } } if (count == 0) { return -1L; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java index f4876149cac13..30ea3891f915b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java @@ -43,6 +43,8 @@ import java.util.Collections; import java.util.Locale; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; /** * An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total @@ -53,11 +55,12 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler { protected final Logger logger; private final Settings indexSettings; private final ShardId shardId; + private final LongSupplier timeSupplier; private final MeanMetric totalMerges = new MeanMetric(); private final CounterMetric totalMergesNumDocs = new CounterMetric(); private final CounterMetric totalMergesSizeInBytes = new CounterMetric(); - private final CounterMetric currentMerges = new CounterMetric(); + private final AtomicLong currentMerges = new AtomicLong(); private final CounterMetric currentMergesNumDocs = new CounterMetric(); private final CounterMetric currentMergesSizeInBytes = new CounterMetric(); private final CounterMetric totalMergeStoppedTime = new CounterMetric(); @@ -66,11 +69,13 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler { private final Set onGoingMerges = ConcurrentCollections.newConcurrentSet(); private final Set readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); private final MergeSchedulerConfig config; + private volatile long lastMergeMillis = -1; - ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) { + ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings, LongSupplier timeSupplier) { this.config = indexSettings.getMergeSchedulerConfig(); this.shardId = shardId; this.indexSettings = indexSettings.getSettings(); + this.timeSupplier = timeSupplier; this.logger = Loggers.getLogger(getClass(), this.indexSettings, shardId); refreshConfig(); } @@ -79,12 +84,22 @@ public Set onGoingMerges() { return readOnlyOnGoingMerges; } + /** + * @see Engine#getLastMergeMillis(long) + */ + public long lastMergeMillis(long now) { + if (currentMerges.get() == 0) { + return lastMergeMillis; + } + return now; + } + @Override protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException { int totalNumDocs = merge.totalNumDocs(); long totalSizeInBytes = merge.totalBytesSize(); long timeNS = System.nanoTime(); - currentMerges.inc(); + currentMerges.incrementAndGet(); currentMergesNumDocs.inc(totalNumDocs); currentMergesSizeInBytes.inc(totalSizeInBytes); @@ -103,7 +118,10 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO onGoingMerges.remove(onGoingMerge); afterMerge(onGoingMerge); - currentMerges.dec(); + // Set the time of the last merge before decrementing since #lastMergeMillis + // reads currentMerges BEFORE lastMergeMillis. + lastMergeMillis = timeSupplier.getAsLong(); + currentMerges.decrementAndGet(); currentMergesNumDocs.dec(totalNumDocs); currentMergesSizeInBytes.dec(totalSizeInBytes); @@ -174,7 +192,7 @@ protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge me MergeStats stats() { final MergeStats mergeStats = new MergeStats(); mergeStats.add(totalMerges.count(), totalMerges.sum(), totalMergesNumDocs.count(), totalMergesSizeInBytes.count(), - currentMerges.count(), currentMergesNumDocs.count(), currentMergesSizeInBytes.count(), + currentMerges.get(), currentMergesNumDocs.count(), currentMergesSizeInBytes.count(), totalMergeStoppedTime.count(), totalMergeThrottledTime.count(), config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7faaf51f4de6a..3882137f14178 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.FileNotFoundException; @@ -1575,6 +1576,14 @@ public long getLastWriteNanos() { return this.lastWriteNanos; } + /** + * Return the last time in millis since Epoch when a merge finished, or + * {@code now} if there are ongoing merges. + * @param now the current timestamp + * @see ThreadPool#absoluteTimeInMillis() + */ + public abstract long getLastMergeMillis(long now); + /** * Called for each new opened engine searcher to warm new segments * diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bca84f81a29c4..45cafd6917aa1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -174,7 +174,8 @@ public InternalEngine(EngineConfig engineConfig) { try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); - mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); + mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), + engineConfig.getIndexSettings(), engineConfig.getThreadPool()::absoluteTimeInMillis); throttle = new IndexThrottle(); try { translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier()); @@ -1996,8 +1997,8 @@ private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeSch private final AtomicInteger numMergesInFlight = new AtomicInteger(0); private final AtomicBoolean isThrottling = new AtomicBoolean(); - EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) { - super(shardId, indexSettings); + EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, LongSupplier timeSupplier) { + super(shardId, indexSettings, timeSupplier); } @Override @@ -2244,4 +2245,9 @@ private static Map commitDataAsMap(final IndexWriter indexWriter } return commitData; } + + @Override + public long getLastMergeMillis(long now) { + return mergeScheduler.lastMergeMillis(now); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 60392ab7990df..c4758d1e88995 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -59,9 +59,12 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.SingleObjectCache; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.xcontent.XContentHelper; @@ -136,6 +139,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; +import java.io.UncheckedIOException; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -163,6 +167,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { + public static final Setting INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING = + Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope); + private final ThreadPool threadPool; private final MapperService mapperService; private final IndexCache indexCache; @@ -240,6 +247,8 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicLong lastSearcherAccess = new AtomicLong(); private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); + private final SingleObjectCache storeStatsCache; + public IndexShard( ShardRouting shardRouting, IndexSettings indexSettings, @@ -309,6 +318,41 @@ public IndexShard( primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); lastSearcherAccess.set(threadPool.relativeTimeInMillis()); + + final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); + logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval); + storeStatsCache = new SingleObjectCache(refreshInterval, new long[] { -1, -1 }) { + + @Override + protected long[] refresh() { + try { + return new long[] { store.sizeInBytes(), threadPool.absoluteTimeInMillis() }; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected boolean needsRefresh() { + long[] cached = getCached(); + if (cached[1] == -1) { + // not initialized + return true; + } + if (super.needsRefresh() == false) { + // prevent callers from hammering the FS + return false; + } + if (isActive()) { + // the shard is actively indexing, don't cache + return true; + } + // Now we compare the last modification of the store with + final long now = threadPool.absoluteTimeInMillis(); + return getEngine().getLastMergeMillis(now) >= cached[1]; + } + }; + persistMetadata(path, indexSettings, shardRouting, null, logger); } @@ -899,12 +943,45 @@ public GetStats getStats() { return getService.stats(); } + private static class StoreStatsCache extends SingleObjectCache { + private final Store store; + + StoreStatsCache(TimeValue refreshInterval, Store store) throws IOException { + super(refreshInterval, store.sizeInBytes()); + this.store = store; + } + + @Override + protected Long refresh() { + try { + return store.sizeInBytes(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to refresh store stats", ex); + } + } + } + + /** + * Return the store size in bytes. + * NOTE: this method caches its results for some time so that repeated + * calls may not put too much load on the local node. + */ + private long storeSizeInBytes() throws IOException { + try { + return storeStatsCache.getOrRefresh()[0]; + } catch (UncheckedIOException e) { + // unwrap the original IOException + throw e.getCause(); + } + } + public StoreStats storeStats() { try { - return store.stats(); + final long storeSizeInBytes = storeSizeInBytes(); + return new StoreStats(storeSizeInBytes); } catch (IOException e) { throw new ElasticsearchException("io exception while building 'store stats'", e); - } catch (AlreadyClosedException ex) { + } catch (AlreadyClosedException e) { return null; // already closed } } @@ -1484,6 +1561,11 @@ public void addShardFailureCallback(Consumer onShardFailure) { public void checkIdle(long inactiveTimeNS) { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { + if (active.get() == false) { + // We refresh when transitioning to an inactive state to make + // it easier to cache the store size. + refresh("transition to inactive"); + } boolean wasActive = active.getAndSet(false); if (wasActive) { logger.debug("shard is now inactive"); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index ccaae9d5f79df..f2093654bc6a7 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -50,7 +50,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -63,11 +62,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.SingleObjectCache; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; @@ -138,15 +133,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref static final int VERSION_START = 0; static final int VERSION = VERSION_WRITE_THROWABLE; static final String CORRUPTED = "corrupted_"; - public static final Setting INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING = - Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final StoreDirectory directory; private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock(); private final ShardLock shardLock; private final OnClose onClose; - private final SingleObjectCache statsCache; private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { @Override @@ -167,9 +159,6 @@ public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService dire this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId)); this.shardLock = shardLock; this.onClose = onClose; - final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); - this.statsCache = new StoreStatsCache(refreshInterval, directory); - logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval); assert onClose != null; assert shardLock != null; @@ -375,9 +364,25 @@ public void exorciseIndex(CheckIndex.Status status) throws IOException { } } - public StoreStats stats() throws IOException { + /** + * Return the size of the store in bytes. + * NOTE: Prefer {@link IndexShard#storeStats()} if you need to call this + * repeatedly, which performs caching in order to avoid hammering the local + * node with metadata reads on the index files. + */ + public long sizeInBytes() throws IOException { ensureOpen(); - return statsCache.getOrRefresh(); + long estimatedSize = 0; + String[] files = directory.listAll(); + for (String file : files) { + try { + estimatedSize += directory.fileLength(file); + } catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) { + // ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while + // calling Files.size, you can also sometimes hit AccessDeniedException + } + } + return estimatedSize; } /** @@ -1428,38 +1433,6 @@ public void accept(ShardLock Lock) { }; } - private static class StoreStatsCache extends SingleObjectCache { - private final Directory directory; - - StoreStatsCache(TimeValue refreshInterval, Directory directory) throws IOException { - super(refreshInterval, new StoreStats(estimateSize(directory))); - this.directory = directory; - } - - @Override - protected StoreStats refresh() { - try { - return new StoreStats(estimateSize(directory)); - } catch (IOException ex) { - throw new ElasticsearchException("failed to refresh store stats", ex); - } - } - - private static long estimateSize(Directory directory) throws IOException { - long estimatedSize = 0; - String[] files = directory.listAll(); - for (String file : files) { - try { - estimatedSize += directory.fileLength(file); - } catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) { - // ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while - // calling Files.size, you can also sometimes hit AccessDeniedException - } - } - return estimatedSize; - } - } - /** * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. */ diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index d7d232a08d43c..f46ac9b106262 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -131,7 +131,7 @@ protected Collection> nodePlugins() { public void testClusterInfoServiceCollectsInformation() throws Exception { internalCluster().startNodes(2); assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0) + .put(IndexShard.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE).build())); ensureGreen("test"); InternalTestCluster internalTestCluster = internalCluster(); diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9352d978e6e46..3e77554d35cc7 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -469,7 +469,7 @@ private void corruptFile(Directory dir, String fileIn, String fileOut) throws IO public void assertDeleteContent(Store store, DirectoryService service) throws IOException { deleteContent(store.directory()); assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0)); - assertThat(store.stats().sizeInBytes(), equalTo(0L)); + assertThat(store.sizeInBytes(), equalTo(0L)); assertThat(service.newDirectory().listAll().length, equalTo(0)); } @@ -761,8 +761,7 @@ public void testStoreStats() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random()); Settings settings = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build(); + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build(); Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService, new DummyShardLock(shardId)); long initialStoreSize = 0; @@ -770,8 +769,7 @@ public void testStoreStats() throws IOException { assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra")); initialStoreSize += store.directory().fileLength(extraFiles); } - StoreStats stats = store.stats(); - assertEquals(stats.getSize().getBytes(), initialStoreSize); + assertEquals(store.sizeInBytes(), initialStoreSize); Directory dir = store.directory(); final long length; @@ -785,8 +783,7 @@ public void testStoreStats() throws IOException { } assertTrue(numNonExtraFiles(store) > 0); - stats = store.stats(); - assertEquals(stats.getSizeInBytes(), length + initialStoreSize); + assertEquals(store.sizeInBytes(), length + initialStoreSize); deleteContent(store.directory()); IOUtils.close(store); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index cf1449fecd6a5..9abf5d58765a6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -46,7 +46,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.recovery.RecoveryStats; -import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState.Stage; import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; @@ -509,8 +508,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, throws ExecutionException, InterruptedException { logger.info("--> creating test index: {}", name); - assertAcked(prepareCreate(name, nodeCount, Settings.builder().put("number_of_shards", shardCount) - .put("number_of_replicas", replicaCount).put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0))); + assertAcked(prepareCreate(name, nodeCount, Settings.builder().put("number_of_shards", shardCount))); ensureGreen(); logger.info("--> indexing sample data"); From 74410c7c4c2985ad856fc9ce1fee96f62dbe0aec Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 23 May 2018 17:41:37 +0200 Subject: [PATCH 2/2] iter --- .../index/engine/ElasticsearchConcurrentMergeScheduler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java index 30ea3891f915b..8e1d502e46e21 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java @@ -69,13 +69,14 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler { private final Set onGoingMerges = ConcurrentCollections.newConcurrentSet(); private final Set readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); private final MergeSchedulerConfig config; - private volatile long lastMergeMillis = -1; + private volatile long lastMergeMillis; ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings, LongSupplier timeSupplier) { this.config = indexSettings.getMergeSchedulerConfig(); this.shardId = shardId; this.indexSettings = indexSettings.getSettings(); this.timeSupplier = timeSupplier; + lastMergeMillis = timeSupplier.getAsLong(); this.logger = Loggers.getLogger(getClass(), this.indexSettings, shardId); refreshConfig(); }