Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesRequestCache;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -142,7 +141,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING,
IndexShard.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING,
MapperService.INDEX_MAPPER_DYNAMIC_SETTING,
MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public T getOrRefresh() {
return cached;
}

/**
* Return the (potentially stale) cache entry.
*/
protected final T getCached() {
return cached;
}

/**
* Returns a new instance to cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
Expand Down Expand Up @@ -301,8 +302,11 @@ private long getAvgShardSizeInBytes() throws IOException {
long sum = 0;
int count = 0;
for (IndexShard indexShard : this) {
sum += indexShard.store().stats().sizeInBytes();
count++;
StoreStats storeStats = indexShard.storeStats();
if (storeStats != null) {
sum += storeStats.sizeInBytes();
count++;
}
}
if (count == 0) {
return -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

/**
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
Expand All @@ -53,11 +55,12 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
protected final Logger logger;
private final Settings indexSettings;
private final ShardId shardId;
private final LongSupplier timeSupplier;

private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric totalMergesNumDocs = new CounterMetric();
private final CounterMetric totalMergesSizeInBytes = new CounterMetric();
private final CounterMetric currentMerges = new CounterMetric();
private final AtomicLong currentMerges = new AtomicLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this change?

private final CounterMetric currentMergesNumDocs = new CounterMetric();
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
private final CounterMetric totalMergeStoppedTime = new CounterMetric();
Expand All @@ -66,11 +69,14 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
private final MergeSchedulerConfig config;
private volatile long lastMergeMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these changes. I guess it would be enough to check Engine#getMergeStats() and then do:

mergeStats.current > 0 || mergeStats.total != previousStats.total?


ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings, LongSupplier timeSupplier) {
this.config = indexSettings.getMergeSchedulerConfig();
this.shardId = shardId;
this.indexSettings = indexSettings.getSettings();
this.timeSupplier = timeSupplier;
lastMergeMillis = timeSupplier.getAsLong();
this.logger = Loggers.getLogger(getClass(), this.indexSettings, shardId);
refreshConfig();
}
Expand All @@ -79,12 +85,22 @@ public Set<OnGoingMerge> onGoingMerges() {
return readOnlyOnGoingMerges;
}

/**
* @see Engine#getLastMergeMillis(long)
*/
public long lastMergeMillis(long now) {
if (currentMerges.get() == 0) {
return lastMergeMillis;
}
return now;
}

@Override
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long timeNS = System.nanoTime();
currentMerges.inc();
currentMerges.incrementAndGet();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);

Expand All @@ -103,7 +119,10 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO
onGoingMerges.remove(onGoingMerge);
afterMerge(onGoingMerge);

currentMerges.dec();
// Set the time of the last merge before decrementing since #lastMergeMillis
// reads currentMerges BEFORE lastMergeMillis.
lastMergeMillis = timeSupplier.getAsLong();
currentMerges.decrementAndGet();
currentMergesNumDocs.dec(totalNumDocs);
currentMergesSizeInBytes.dec(totalSizeInBytes);

Expand Down Expand Up @@ -174,7 +193,7 @@ protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge me
MergeStats stats() {
final MergeStats mergeStats = new MergeStats();
mergeStats.add(totalMerges.count(), totalMerges.sum(), totalMergesNumDocs.count(), totalMergesSizeInBytes.count(),
currentMerges.count(), currentMergesNumDocs.count(), currentMergesSizeInBytes.count(),
currentMerges.get(), currentMergesNumDocs.count(), currentMergesSizeInBytes.count(),
totalMergeStoppedTime.count(),
totalMergeThrottledTime.count(),
config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -1575,6 +1576,14 @@ public long getLastWriteNanos() {
return this.lastWriteNanos;
}

/**
* Return the last time in millis since Epoch when a merge finished, or
* {@code now} if there are ongoing merges.
* @param now the current timestamp
* @see ThreadPool#absoluteTimeInMillis()
*/
public abstract long getLastMergeMillis(long now);

/**
* Called for each new opened engine searcher to warm new segments
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public InternalEngine(EngineConfig engineConfig) {
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(),
engineConfig.getIndexSettings(), engineConfig.getThreadPool()::absoluteTimeInMillis);
throttle = new IndexThrottle();
try {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
Expand Down Expand Up @@ -1996,8 +1997,8 @@ private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeSch
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();

EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, LongSupplier timeSupplier) {
super(shardId, indexSettings, timeSupplier);
}

@Override
Expand Down Expand Up @@ -2244,4 +2245,9 @@ private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter
}
return commitData;
}

@Override
public long getLastMergeMillis(long now) {
return mergeScheduler.lastMergeMillis(now);
}
}
86 changes: 84 additions & 2 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down Expand Up @@ -136,6 +139,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -163,6 +167,9 @@

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {

public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);

private final ThreadPool threadPool;
private final MapperService mapperService;
private final IndexCache indexCache;
Expand Down Expand Up @@ -240,6 +247,8 @@ Runnable getGlobalCheckpointSyncer() {
private final AtomicLong lastSearcherAccess = new AtomicLong();
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();

private final SingleObjectCache<long[]> storeStatsCache;

public IndexShard(
ShardRouting shardRouting,
IndexSettings indexSettings,
Expand Down Expand Up @@ -309,6 +318,41 @@ public IndexShard(
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners();
lastSearcherAccess.set(threadPool.relativeTimeInMillis());

final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
storeStatsCache = new SingleObjectCache<long[]>(refreshInterval, new long[] { -1, -1 }) {

@Override
protected long[] refresh() {
try {
return new long[] { store.sizeInBytes(), threadPool.absoluteTimeInMillis() };
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
protected boolean needsRefresh() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why this is so complex, wouldn't it be enough to override needsRefresh()?

private MergeStats previousStats = new MergeStats();

if (super.needsRefresh()) {
  boolean refresh = false;
  if (isActive()) {
    MergeStats mergeStats = getEngine().getMergeStats();
    refresh = mergeStats.current > 0 || mergeStats.total != previousStats.total;
    previousStats = mergeStats;
  }
  return refresh;
}

long[] cached = getCached();
if (cached[1] == -1) {
// not initialized
return true;
}
if (super.needsRefresh() == false) {
// prevent callers from hammering the FS
return false;
}
if (isActive()) {
// the shard is actively indexing, don't cache
return true;
}
// Now we compare the last modification of the store with
final long now = threadPool.absoluteTimeInMillis();
return getEngine().getLastMergeMillis(now) >= cached[1];
}
};

persistMetadata(path, indexSettings, shardRouting, null, logger);
}

Expand Down Expand Up @@ -899,12 +943,45 @@ public GetStats getStats() {
return getService.stats();
}

private static class StoreStatsCache extends SingleObjectCache<Long> {
private final Store store;

StoreStatsCache(TimeValue refreshInterval, Store store) throws IOException {
super(refreshInterval, store.sizeInBytes());
this.store = store;
}

@Override
protected Long refresh() {
try {
return store.sizeInBytes();
} catch (IOException ex) {
throw new ElasticsearchException("failed to refresh store stats", ex);
}
}
}

/**
* Return the store size in bytes.
* NOTE: this method caches its results for some time so that repeated
* calls may not put too much load on the local node.
*/
private long storeSizeInBytes() throws IOException {
try {
return storeStatsCache.getOrRefresh()[0];
} catch (UncheckedIOException e) {
// unwrap the original IOException
throw e.getCause();
}
}

public StoreStats storeStats() {
try {
return store.stats();
final long storeSizeInBytes = storeSizeInBytes();
return new StoreStats(storeSizeInBytes);
} catch (IOException e) {
throw new ElasticsearchException("io exception while building 'store stats'", e);
} catch (AlreadyClosedException ex) {
} catch (AlreadyClosedException e) {
return null; // already closed
}
}
Expand Down Expand Up @@ -1484,6 +1561,11 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
public void checkIdle(long inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
if (active.get() == false) {
// We refresh when transitioning to an inactive state to make
// it easier to cache the store size.
refresh("transition to inactive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 we should not refresh anything except of the internal reader. Visibility guarantees are important.

}
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
Expand Down
Loading