Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,14 @@ public IndexInput openInput(final String name, final IOContext context) throws I

final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
if (useCache && isExcludedFromCache(name) == false) {
return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize());
return new CachedBlobContainerIndexInput(
this,
fileInfo,
context,
inputStats,
cacheService.getRangeSize(),
cacheService.getRecoveryRangeSize()
);
} else {
return new DirectBlobContainerIndexInput(
blobContainer(),
Expand All @@ -400,6 +407,13 @@ private boolean isExcludedFromCache(String name) {
return ext != null && excludedFileTypes.contains(ext);
}

public boolean isRecoveryFinalized() {
SearchableSnapshotRecoveryState recoveryState = this.recoveryState;
if (recoveryState == null) return false;
RecoveryState.Stage stage = recoveryState.getStage();
return stage == RecoveryState.Stage.DONE || stage == RecoveryState.Stage.FINALIZE;
}

@Override
public String toString() {
return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory + " shard=" + shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private final SearchableSnapshotDirectory directory;
private final CacheFileReference cacheFileReference;
private final int defaultRangeSize;
private final int recoveryRangeSize;

// last read position is kept around in order to detect (non)contiguous reads for stats
private long lastReadPosition;
Expand All @@ -73,7 +74,8 @@ public CachedBlobContainerIndexInput(
FileInfo fileInfo,
IOContext context,
IndexInputStats stats,
int rangeSize
int rangeSize,
int recoveryRangeSize
) {
this(
"CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")",
Expand All @@ -84,7 +86,8 @@ public CachedBlobContainerIndexInput(
0L,
fileInfo.length(),
new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
rangeSize
rangeSize,
recoveryRangeSize
);
assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
stats.incrementOpenCount();
Expand All @@ -99,14 +102,16 @@ private CachedBlobContainerIndexInput(
long offset,
long length,
CacheFileReference cacheFileReference,
int rangeSize
int rangeSize,
int recoveryRangeSize
) {
super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
this.directory = directory;
this.cacheFileReference = cacheFileReference;
this.lastReadPosition = this.offset;
this.lastSeekPosition = this.offset;
this.defaultRangeSize = rangeSize;
this.recoveryRangeSize = recoveryRangeSize;
}

@Override
Expand All @@ -124,7 +129,9 @@ private void ensureContext(Predicate<IOContext> predicate) throws IOException {
}

private long getDefaultRangeSize() {
return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes();
return (context != CACHE_WARMING_CONTEXT)
? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize)
: fileInfo.partSize().getBytes();
}

private Tuple<Long, Long> computeRange(long position) {
Expand Down Expand Up @@ -729,7 +736,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) {
this.offset + offset,
length,
cacheFileReference,
defaultRangeSize
defaultRangeSize,
recoveryRangeSize
);
slice.isClone = true;
return slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public class CacheService extends AbstractLifecycleComponent {

public static final ByteSizeValue MIN_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(4, ByteSizeUnit.KB);
public static final ByteSizeValue MAX_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(Integer.MAX_VALUE, ByteSizeUnit.BYTES);

/**
* If a search needs data from the repository then we expand it to a larger contiguous range whose size is determined by this setting,
* in anticipation of needing nearby data in subsequent reads. Repository reads typically have quite high latency (think ~100ms) and
* the default of 32MB for this setting represents the approximate point at which size starts to matter. In other words, reads of
* ranges smaller than 32MB don't usually happen much quicker, so we may as well expand all the way to 32MB ranges.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
SETTINGS_PREFIX + "range_size",
new ByteSizeValue(32, ByteSizeUnit.MB), // default
Expand All @@ -81,6 +88,20 @@ public class CacheService extends AbstractLifecycleComponent {
Setting.Property.NodeScope
);

/**
* Starting up a shard involves reading small parts of some files from the repository, independently of the pre-warming process. If we
* expand those ranges using {@link CacheService#SNAPSHOT_CACHE_RANGE_SIZE_SETTING} then we end up reading quite a few 32MB ranges. If
* we read enough of these ranges for the restore throttling rate limiter to kick in then all the read threads will end up waiting on
* the throttle, blocking subsequent reads. By using a smaller read size during restore we avoid clogging up the rate limiter so much.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
Copy link
Contributor

@DaveCTurner DaveCTurner Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest a comment so we remember why we are doing this:

Suggested change
public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
/**
* Starting up a shard involves reading small parts of some files from the repository, independently of the pre-warming process. If we
* expand those ranges using {@link CacheService#SNAPSHOT_CACHE_RANGE_SIZE_SETTING} then we end up reading quite a few 32MB ranges. If
* we read enough of these ranges for the restore throttling rate limiter to kick in then all the read threads will end up waiting on
* the throttle, blocking subsequent reads. By using a smaller read size during restore we avoid clogging up the rate limiter so much.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING = Setting.byteSizeSetting(

Also suggest a similar comment on the other setting since this came up as a question in the investigation that led to this PR.

    /**
     * If a search needs data from the repository then we expand it to a larger contiguous range whose size is determined by this setting,
     * in anticipation of needing nearby data in subsequent reads. Repository reads typically have quite high latency (think ~100ms) and
     * the default of 32MB for this setting represents the approximate point at which size starts to matter. In other words, reads of
     * ranges smaller than 32MB don't usually happen much quicker, so we may as well expand all the way to 32MB ranges.
     */
    public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RANGE_SIZE_SETTING = Setting.byteSizeSetting(

SETTINGS_PREFIX + "recovery_range_size",
new ByteSizeValue(128, ByteSizeUnit.KB), // default
MIN_SNAPSHOT_CACHE_RANGE_SIZE, // min
MAX_SNAPSHOT_CACHE_RANGE_SIZE, // max
Setting.Property.NodeScope
);

public static final TimeValue MIN_SNAPSHOT_CACHE_SYNC_INTERVAL = TimeValue.timeValueSeconds(1L);
public static final Setting<TimeValue> SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING = Setting.timeSetting(
SETTINGS_PREFIX + "sync.interval",
Expand Down Expand Up @@ -118,6 +139,7 @@ public class CacheService extends AbstractLifecycleComponent {
private final Cache<CacheKey, CacheFile> cache;
private final ByteSizeValue cacheSize;
private final ByteSizeValue rangeSize;
private final ByteSizeValue recoveryRangeSize;
private final KeyedLock<ShardEviction> shardsEvictionLock;
private final Set<ShardEviction> evictedShards;

Expand All @@ -132,6 +154,7 @@ public CacheService(
this.threadPool = Objects.requireNonNull(threadPool);
this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings);
this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings);
this.recoveryRangeSize = SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings);
this.cache = CacheBuilder.<CacheKey, CacheFile>builder()
.setMaximumWeight(cacheSize.getBytes())
.weigher((key, entry) -> entry.getLength())
Expand Down Expand Up @@ -227,6 +250,13 @@ public int getRangeSize() {
return toIntBytes(rangeSize.getBytes());
}

/**
* @return the cache range size (in bytes) to use during recovery (until post_recovery)
*/
public int getRecoveryRangeSize() {
return toIntBytes(recoveryRangeSize.getBytes());
}

/**
* Retrieves the {@link CacheFile} instance associated with the specified {@link CacheKey} in the cache. If the key is not already
* associated with a {@link CacheFile}, this method creates a new instance using the given file length and cache directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private void testDirectories(
final boolean prewarmCache,
final CheckedBiConsumer<Directory, SearchableSnapshotDirectory, Exception> consumer
) throws Exception {
testDirectories(enableCache, prewarmCache, createRecoveryState(), Settings.EMPTY, consumer);
testDirectories(enableCache, prewarmCache, createRecoveryState(randomBoolean()), Settings.EMPTY, consumer);
}

private void testDirectories(
Expand Down Expand Up @@ -710,7 +710,7 @@ public void testClearCache() throws Exception {
threadPool
)
) {
final RecoveryState recoveryState = createRecoveryState();
final RecoveryState recoveryState = createRecoveryState(randomBoolean());
final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
final boolean loaded = directory.loadSnapshot(recoveryState, f);
f.get();
Expand Down Expand Up @@ -779,7 +779,7 @@ public void testRecoveryStateIsKeptOpenAfterPreWarmFailures() throws Exception {
PathUtilsForTesting.installMock(fileSystem);

try {
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
boolean areAllFilesReused = snapshotDirectory.snapshot()
.indexFiles()
Expand All @@ -800,7 +800,7 @@ public void testRecoveryStateIsKeptOpenAfterPreWarmFailures() throws Exception {
}

public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exception {
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
testDirectories(true, false, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
Expand All @@ -809,7 +809,7 @@ public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exceptio
}

public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception {
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);

List<String> allFileExtensions = List.of(
"fdt",
Expand Down Expand Up @@ -845,7 +845,7 @@ public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception {
}

public void testFilesWithHashEqualsContentsAreMarkedAsReusedOnRecoveryState() throws Exception {
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);

testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
assertBusy(() -> assertTrue(recoveryState.isPreWarmComplete()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ public void testRandomReads() throws Exception {
final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize);
final BlobContainer blobContainer;
if (input.length == partSize && input.length <= cacheService.getCacheSize() && prewarmEnabled == false) {
blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
blobContainer = new CountingBlobContainer(singleBlobContainer);
} else {
blobContainer = singleBlobContainer;
}

final boolean recoveryFinalizedDone = randomBoolean();
final Path shardDir;
try {
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testRandomReads() throws Exception {
threadPool
)
) {
RecoveryState recoveryState = createRecoveryState();
RecoveryState recoveryState = createRecoveryState(recoveryFinalizedDone);
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final boolean loaded = directory.loadSnapshot(recoveryState, future);
if (randomBoolean()) {
Expand All @@ -136,7 +137,10 @@ public void testRandomReads() throws Exception {
}

if (blobContainer instanceof CountingBlobContainer) {
long numberOfRanges = TestUtils.numberOfRanges(input.length, cacheService.getRangeSize());
long numberOfRanges = TestUtils.numberOfRanges(
input.length,
recoveryFinalizedDone ? cacheService.getRangeSize() : cacheService.getRecoveryRangeSize()
);
assertThat(
"Expected at most " + numberOfRanges + " ranges fetched from the source",
((CountingBlobContainer) blobContainer).totalOpens.sum(),
Expand Down Expand Up @@ -211,7 +215,7 @@ public void testThrowsEOFException() throws Exception {
threadPool
)
) {
RecoveryState recoveryState = createRecoveryState();
RecoveryState recoveryState = createRecoveryState(randomBoolean());
final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
final boolean loaded = searchableSnapshotDirectory.loadSnapshot(recoveryState, f);
try {
Expand Down Expand Up @@ -262,11 +266,8 @@ private static class CountingBlobContainer extends FilterBlobContainer {

private final AtomicInteger openStreams = new AtomicInteger(0);

private final int rangeSize;

CountingBlobContainer(BlobContainer in, int rangeSize) {
CountingBlobContainer(BlobContainer in) {
super(in);
this.rangeSize = rangeSize;
}

@Override
Expand All @@ -276,7 +277,7 @@ public InputStream readBlob(String blobName, long position, long length) throws

@Override
protected BlobContainer wrapChild(BlobContainer child) {
return new CountingBlobContainer(child, this.rangeSize);
return new CountingBlobContainer(child);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ protected CacheService randomCacheService() {
if (randomBoolean()) {
cacheSettings.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
}
if (randomBoolean()) {
cacheSettings.put(CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
}
if (randomBoolean()) {
cacheSettings.put(
CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(),
Expand Down Expand Up @@ -149,7 +152,7 @@ protected static ByteSizeValue randomCacheRangeSize() {
);
}

protected static SearchableSnapshotRecoveryState createRecoveryState() {
protected static SearchableSnapshotRecoveryState createRecoveryState(boolean finalizedDone) {
ShardRouting shardRouting = TestShardRouting.newShardRouting(
new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0),
randomAlphaOfLength(10),
Expand All @@ -170,8 +173,9 @@ protected static SearchableSnapshotRecoveryState createRecoveryState() {
.setStage(RecoveryState.Stage.VERIFY_INDEX)
.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.getIndex().setFileDetailsComplete();
recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);

if (finalizedDone) {
recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);
}
return recoveryState;
}

Expand Down