diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index fff39310b0214..4512b1d9dffc2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -374,7 +374,14 @@ public IndexInput openInput(final String name, final IOContext context) throws I final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length())); if (useCache && isExcludedFromCache(name) == false) { - return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize()); + return new CachedBlobContainerIndexInput( + this, + fileInfo, + context, + inputStats, + cacheService.getRangeSize(), + cacheService.getRecoveryRangeSize() + ); } else { return new DirectBlobContainerIndexInput( blobContainer(), @@ -400,6 +407,13 @@ private boolean isExcludedFromCache(String name) { return ext != null && excludedFileTypes.contains(ext); } + public boolean isRecoveryFinalized() { + SearchableSnapshotRecoveryState recoveryState = this.recoveryState; + if (recoveryState == null) return false; + RecoveryState.Stage stage = recoveryState.getStage(); + return stage == RecoveryState.Stage.DONE || stage == RecoveryState.Stage.FINALIZE; + } + @Override public String toString() { return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory + " shard=" + shardId; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 09b692a178dc4..a7761f5eb0650 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -62,6 +62,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private final SearchableSnapshotDirectory directory; private final CacheFileReference cacheFileReference; private final int defaultRangeSize; + private final int recoveryRangeSize; // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; @@ -73,7 +74,8 @@ public CachedBlobContainerIndexInput( FileInfo fileInfo, IOContext context, IndexInputStats stats, - int rangeSize + int rangeSize, + int recoveryRangeSize ) { this( "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")", @@ -84,7 +86,8 @@ public CachedBlobContainerIndexInput( 0L, fileInfo.length(), new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), - rangeSize + rangeSize, + recoveryRangeSize ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -99,7 +102,8 @@ private CachedBlobContainerIndexInput( long offset, long length, CacheFileReference cacheFileReference, - int rangeSize + int rangeSize, + int recoveryRangeSize ) { super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); this.directory = directory; @@ -107,6 +111,7 @@ private CachedBlobContainerIndexInput( this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; this.defaultRangeSize = rangeSize; + this.recoveryRangeSize = recoveryRangeSize; } @Override @@ -124,7 +129,9 @@ private void ensureContext(Predicate predicate) throws IOException { } private long getDefaultRangeSize() { - return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes(); + return (context != CACHE_WARMING_CONTEXT) + ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize) + : fileInfo.partSize().getBytes(); } private Tuple computeRange(long position) { @@ -729,7 +736,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { this.offset + offset, length, cacheFileReference, - defaultRangeSize + defaultRangeSize, + recoveryRangeSize ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index a34e67fffb5fe..a72f3c46be574 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -73,6 +73,13 @@ public class CacheService extends AbstractLifecycleComponent { public static final ByteSizeValue MIN_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(4, ByteSizeUnit.KB); public static final ByteSizeValue MAX_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(Integer.MAX_VALUE, ByteSizeUnit.BYTES); + + /** + * If a search needs data from the repository then we expand it to a larger contiguous range whose size is determined by this setting, + * in anticipation of needing nearby data in subsequent reads. Repository reads typically have quite high latency (think ~100ms) and + * the default of 32MB for this setting represents the approximate point at which size starts to matter. In other words, reads of + * ranges smaller than 32MB don't usually happen much quicker, so we may as well expand all the way to 32MB ranges. + */ public static final Setting SNAPSHOT_CACHE_RANGE_SIZE_SETTING = Setting.byteSizeSetting( SETTINGS_PREFIX + "range_size", new ByteSizeValue(32, ByteSizeUnit.MB), // default @@ -81,6 +88,20 @@ public class CacheService extends AbstractLifecycleComponent { Setting.Property.NodeScope ); + /** + * Starting up a shard involves reading small parts of some files from the repository, independently of the pre-warming process. If we + * expand those ranges using {@link CacheService#SNAPSHOT_CACHE_RANGE_SIZE_SETTING} then we end up reading quite a few 32MB ranges. If + * we read enough of these ranges for the restore throttling rate limiter to kick in then all the read threads will end up waiting on + * the throttle, blocking subsequent reads. By using a smaller read size during restore we avoid clogging up the rate limiter so much. + */ + public static final Setting SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING = Setting.byteSizeSetting( + SETTINGS_PREFIX + "recovery_range_size", + new ByteSizeValue(128, ByteSizeUnit.KB), // default + MIN_SNAPSHOT_CACHE_RANGE_SIZE, // min + MAX_SNAPSHOT_CACHE_RANGE_SIZE, // max + Setting.Property.NodeScope + ); + public static final TimeValue MIN_SNAPSHOT_CACHE_SYNC_INTERVAL = TimeValue.timeValueSeconds(1L); public static final Setting SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING = Setting.timeSetting( SETTINGS_PREFIX + "sync.interval", @@ -118,6 +139,7 @@ public class CacheService extends AbstractLifecycleComponent { private final Cache cache; private final ByteSizeValue cacheSize; private final ByteSizeValue rangeSize; + private final ByteSizeValue recoveryRangeSize; private final KeyedLock shardsEvictionLock; private final Set evictedShards; @@ -132,6 +154,7 @@ public CacheService( this.threadPool = Objects.requireNonNull(threadPool); this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings); this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings); + this.recoveryRangeSize = SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings); this.cache = CacheBuilder.builder() .setMaximumWeight(cacheSize.getBytes()) .weigher((key, entry) -> entry.getLength()) @@ -227,6 +250,13 @@ public int getRangeSize() { return toIntBytes(rangeSize.getBytes()); } + /** + * @return the cache range size (in bytes) to use during recovery (until post_recovery) + */ + public int getRecoveryRangeSize() { + return toIntBytes(recoveryRangeSize.getBytes()); + } + /** * Retrieves the {@link CacheFile} instance associated with the specified {@link CacheKey} in the cache. If the key is not already * associated with a {@link CacheFile}, this method creates a new instance using the given file length and cache directory. diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 7e2df834b66f3..2a8e98e43829c 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -461,7 +461,7 @@ private void testDirectories( final boolean prewarmCache, final CheckedBiConsumer consumer ) throws Exception { - testDirectories(enableCache, prewarmCache, createRecoveryState(), Settings.EMPTY, consumer); + testDirectories(enableCache, prewarmCache, createRecoveryState(randomBoolean()), Settings.EMPTY, consumer); } private void testDirectories( @@ -710,7 +710,7 @@ public void testClearCache() throws Exception { threadPool ) ) { - final RecoveryState recoveryState = createRecoveryState(); + final RecoveryState recoveryState = createRecoveryState(randomBoolean()); final PlainActionFuture f = PlainActionFuture.newFuture(); final boolean loaded = directory.loadSnapshot(recoveryState, f); f.get(); @@ -779,7 +779,7 @@ public void testRecoveryStateIsKeptOpenAfterPreWarmFailures() throws Exception { PathUtilsForTesting.installMock(fileSystem); try { - SearchableSnapshotRecoveryState recoveryState = createRecoveryState(); + SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true); testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> { boolean areAllFilesReused = snapshotDirectory.snapshot() .indexFiles() @@ -800,7 +800,7 @@ public void testRecoveryStateIsKeptOpenAfterPreWarmFailures() throws Exception { } public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exception { - SearchableSnapshotRecoveryState recoveryState = createRecoveryState(); + SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true); testDirectories(true, false, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> { assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE)); assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L)); @@ -809,7 +809,7 @@ public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exceptio } public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception { - SearchableSnapshotRecoveryState recoveryState = createRecoveryState(); + SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true); List allFileExtensions = List.of( "fdt", @@ -845,7 +845,7 @@ public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception { } public void testFilesWithHashEqualsContentsAreMarkedAsReusedOnRecoveryState() throws Exception { - SearchableSnapshotRecoveryState recoveryState = createRecoveryState(); + SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true); testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> { assertBusy(() -> assertTrue(recoveryState.isPreWarmComplete())); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index e7d3cde07d9f1..7ad7d8abbb40a 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -83,11 +83,12 @@ public void testRandomReads() throws Exception { final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize); final BlobContainer blobContainer; if (input.length == partSize && input.length <= cacheService.getCacheSize() && prewarmEnabled == false) { - blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize()); + blobContainer = new CountingBlobContainer(singleBlobContainer); } else { blobContainer = singleBlobContainer; } + final boolean recoveryFinalizedDone = randomBoolean(); final Path shardDir; try { shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); @@ -116,7 +117,7 @@ public void testRandomReads() throws Exception { threadPool ) ) { - RecoveryState recoveryState = createRecoveryState(); + RecoveryState recoveryState = createRecoveryState(recoveryFinalizedDone); final PlainActionFuture future = PlainActionFuture.newFuture(); final boolean loaded = directory.loadSnapshot(recoveryState, future); if (randomBoolean()) { @@ -136,7 +137,10 @@ public void testRandomReads() throws Exception { } if (blobContainer instanceof CountingBlobContainer) { - long numberOfRanges = TestUtils.numberOfRanges(input.length, cacheService.getRangeSize()); + long numberOfRanges = TestUtils.numberOfRanges( + input.length, + recoveryFinalizedDone ? cacheService.getRangeSize() : cacheService.getRecoveryRangeSize() + ); assertThat( "Expected at most " + numberOfRanges + " ranges fetched from the source", ((CountingBlobContainer) blobContainer).totalOpens.sum(), @@ -211,7 +215,7 @@ public void testThrowsEOFException() throws Exception { threadPool ) ) { - RecoveryState recoveryState = createRecoveryState(); + RecoveryState recoveryState = createRecoveryState(randomBoolean()); final PlainActionFuture f = PlainActionFuture.newFuture(); final boolean loaded = searchableSnapshotDirectory.loadSnapshot(recoveryState, f); try { @@ -262,11 +266,8 @@ private static class CountingBlobContainer extends FilterBlobContainer { private final AtomicInteger openStreams = new AtomicInteger(0); - private final int rangeSize; - - CountingBlobContainer(BlobContainer in, int rangeSize) { + CountingBlobContainer(BlobContainer in) { super(in); - this.rangeSize = rangeSize; } @Override @@ -276,7 +277,7 @@ public InputStream readBlob(String blobName, long position, long length) throws @Override protected BlobContainer wrapChild(BlobContainer child) { - return new CountingBlobContainer(child, this.rangeSize); + return new CountingBlobContainer(child); } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index d9ee98343a9f6..a467d42966a1e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -101,6 +101,9 @@ protected CacheService randomCacheService() { if (randomBoolean()) { cacheSettings.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize()); } + if (randomBoolean()) { + cacheSettings.put(CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize()); + } if (randomBoolean()) { cacheSettings.put( CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(), @@ -149,7 +152,7 @@ protected static ByteSizeValue randomCacheRangeSize() { ); } - protected static SearchableSnapshotRecoveryState createRecoveryState() { + protected static SearchableSnapshotRecoveryState createRecoveryState(boolean finalizedDone) { ShardRouting shardRouting = TestShardRouting.newShardRouting( new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0), randomAlphaOfLength(10), @@ -170,8 +173,9 @@ protected static SearchableSnapshotRecoveryState createRecoveryState() { .setStage(RecoveryState.Stage.VERIFY_INDEX) .setStage(RecoveryState.Stage.TRANSLOG); recoveryState.getIndex().setFileDetailsComplete(); - recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE); - + if (finalizedDone) { + recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE); + } return recoveryState; }