diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index ba0ea74107a97..52edf9c004359 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.index.store; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.elasticsearch.common.blobstore.BlobContainer; @@ -16,11 +18,16 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; + +import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { + protected final Logger logger; protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; @@ -33,6 +40,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu private AtomicBoolean closed; public BaseSearchableSnapshotIndexInput( + Logger logger, String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, @@ -42,6 +50,7 @@ public BaseSearchableSnapshotIndexInput( long length ) { super(resourceDesc, context); + this.logger = Objects.requireNonNull(logger); this.blobContainer = Objects.requireNonNull(blobContainer); this.fileInfo = Objects.requireNonNull(fileInfo); this.context = Objects.requireNonNull(context); @@ -54,25 +63,60 @@ public BaseSearchableSnapshotIndexInput( this.isClone = false; } - public BaseSearchableSnapshotIndexInput( - String resourceDesc, - BlobContainer blobContainer, - FileInfo fileInfo, - IOContext context, - IndexInputStats stats, - long offset, - long length, - int bufferSize - ) { - this(resourceDesc, blobContainer, fileInfo, context, stats, offset, length); - setBufferSize(bufferSize); - } - @Override public final long length() { return length; } + @Override + protected final void readInternal(ByteBuffer b) throws IOException { + assert assertCurrentThreadIsNotCacheFetchAsync(); + + // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often + // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer. + if (maybeReadChecksumFromFileInfo(b)) { + logger.trace("read footer of file [{}], bypassing all caches", fileInfo.physicalName()); + assert b.remaining() == 0L : b.remaining(); + return; + } + + doReadInternal(b); + } + + protected abstract void doReadInternal(ByteBuffer b) throws IOException; + + /** + * Detects read operations that are executed on the last 16 bytes of the index input which is where Lucene stores the footer checksum + * of Lucene files. If such a read is detected this method tries to complete the read operation by reading the checksum from the + * {@link FileInfo} in memory rather than reading the bytes from the {@link BufferedIndexInput} because that could trigger more cache + * operations. + * + * @return true if the footer checksum has been read from the {@link FileInfo} + */ + private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { + final int remaining = b.remaining(); + if (remaining != CodecUtil.footerLength()) { + return false; + } + final long position = getFilePointer() + this.offset; + if (position != fileInfo.length() - CodecUtil.footerLength()) { + return false; + } + if (isClone) { + return false; + } + boolean success = false; + try { + b.put(checksumToBytesArray(fileInfo.checksum())); + success = true; + } catch (NumberFormatException e) { + // tests disable this optimisation by passing an invalid checksum + } finally { + assert b.remaining() == (success ? 0L : remaining) : b.remaining() + " remaining bytes but success is " + success; + } + return success; + } + /** * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range * spans multiple blobs then this stream will request them in turn. @@ -173,11 +217,18 @@ public final void close() throws IOException { if (isClone == false) { stats.incrementCloseCount(); } - innerClose(); + doClose(); } } - public abstract void innerClose() throws IOException; + public abstract void doClose() throws IOException; + + protected void ensureContext(Predicate predicate) throws IOException { + if (predicate.test(context) == false) { + assert false : "this method should not be used with this context " + context; + throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']'); + } + } protected final boolean assertCurrentThreadMayAccessBlobStore() { final String threadName = Thread.currentThread().getName(); @@ -199,4 +250,15 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() { return true; } + protected static boolean isCacheFetchAsyncThread(final String threadName) { + return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']'); + } + + protected static boolean assertCurrentThreadIsNotCacheFetchAsync() { + final String threadName = Thread.currentThread().getName(); + assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread [" + + threadName + + "] to belong to the cache fetch async thread pool"; + return true; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 02105ee0e995e..64974bb52dc0a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -29,7 +28,6 @@ import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.SearchableSnapshotDirectory; -import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.EOFException; @@ -42,10 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Predicate; import java.util.stream.IntStream; -import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { @@ -106,7 +102,7 @@ private CachedBlobContainerIndexInput( int rangeSize, int recoveryRangeSize ) { - super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); + super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); this.directory = directory; this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; @@ -116,19 +112,12 @@ private CachedBlobContainerIndexInput( } @Override - public void innerClose() { + public void doClose() { if (isClone == false) { cacheFileReference.releaseOnClose(); } } - private void ensureContext(Predicate predicate) throws IOException { - if (predicate.test(context) == false) { - assert false : "this method should not be used with this context " + context; - throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']'); - } - } - private long getDefaultRangeSize() { return (context != CACHE_WARMING_CONTEXT) ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize) @@ -143,24 +132,12 @@ private ByteRange computeRange(long position) { } @Override - protected void readInternal(ByteBuffer b) throws IOException { + protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - assert assertCurrentThreadIsNotCacheFetchAsync(); final long position = getFilePointer() + this.offset; final int length = b.remaining(); - // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often - // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer. - if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) { - if (readChecksumFromFileInfo(b)) { - logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position); - return; - } - assert b.remaining() == length; - } - logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); - try { final CacheFile cacheFile = cacheFileReference.get(); @@ -401,26 +378,6 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e throw new IOException("failed to read data from cache", e); } - private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException { - assert isClone == false; - byte[] footer; - try { - footer = checksumToBytesArray(fileInfo.checksum()); - } catch (NumberFormatException e) { - // tests disable this optimisation by passing an invalid checksum - footer = null; - } - if (footer == null) { - return false; - } - - b.put(footer); - assert b.remaining() == 0L; - return true; - - // TODO we should add this to DirectBlobContainerIndexInput too. - } - /** * Prefetches a complete part and writes it in cache. This method is used to prewarm the cache. * @return a tuple with {@code Tuple} values @@ -737,10 +694,6 @@ private static boolean assertFileChannelOpen(FileChannel fileChannel) { return true; } - private static boolean isCacheFetchAsyncThread(final String threadName) { - return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']'); - } - private static boolean assertCurrentThreadMayWriteCacheFile() { final String threadName = Thread.currentThread().getName(); assert isCacheFetchAsyncThread(threadName) : "expected the current thread [" @@ -748,12 +701,4 @@ assert isCacheFetchAsyncThread(threadName) : "expected the current thread [" + "] to belong to the cache fetch async thread pool"; return true; } - - private static boolean assertCurrentThreadIsNotCacheFetchAsync() { - final String threadName = Thread.currentThread().getName(); - assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread [" - + threadName - + "] to belong to the cache fetch async thread pool"; - return true; - } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index b38a66fcbdf24..779543e51b1c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -27,7 +26,6 @@ import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.SearchableSnapshotDirectory; -import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService.FrozenCacheFile; import org.elasticsearch.xpack.searchablesnapshots.cache.SharedBytes; @@ -40,9 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import java.util.function.Predicate; -import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { @@ -98,7 +94,7 @@ private FrozenIndexInput( int rangeSize, int recoveryRangeSize ) { - super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); + super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); this.directory = directory; this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; @@ -108,17 +104,10 @@ private FrozenIndexInput( } @Override - public void innerClose() { + public void doClose() { // nothing needed to be done here } - private void ensureContext(Predicate predicate) throws IOException { - if (predicate.test(context) == false) { - assert false : "this method should not be used with this context " + context; - throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']'); - } - } - private long getDefaultRangeSize() { return (context != CACHE_WARMING_CONTEXT) ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize) @@ -133,9 +122,8 @@ private ByteRange computeRange(long position) { } @Override - protected void readInternal(ByteBuffer b) throws IOException { + protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - assert assertCurrentThreadIsNotCacheFetchAsync(); final long position = getFilePointer() + this.offset; final int length = b.remaining(); @@ -153,16 +141,6 @@ protected void readInternal(ByteBuffer b) throws IOException { } }; - // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often - // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer. - if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) { - if (readChecksumFromFileInfo(b)) { - logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position); - return; - } - assert b.remaining() == length; - } - logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); try { @@ -476,26 +454,6 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e throw new IOException("failed to read data from cache", e); } - private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException { - assert isClone == false; - byte[] footer; - try { - footer = checksumToBytesArray(fileInfo.checksum()); - } catch (NumberFormatException e) { - // tests disable this optimisation by passing an invalid checksum - footer = null; - } - if (footer == null) { - return false; - } - - b.put(footer); - assert b.remaining() == 0L; - return true; - - // TODO we should add this to DirectBlobContainerIndexInput too. - } - private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException { assert assertCurrentThreadMayWriteCacheFile(); return fc.write(byteBuffer, start); @@ -708,10 +666,6 @@ public String toString() { + '}'; } - private static boolean isCacheFetchAsyncThread(final String threadName) { - return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']'); - } - private static boolean assertCurrentThreadMayWriteCacheFile() { final String threadName = Thread.currentThread().getName(); assert isCacheFetchAsyncThread(threadName) : "expected the current thread [" @@ -719,12 +673,4 @@ assert isCacheFetchAsyncThread(threadName) : "expected the current thread [" + "] to belong to the cache fetch async thread pool"; return true; } - - private static boolean assertCurrentThreadIsNotCacheFetchAsync() { - final String threadName = Thread.currentThread().getName(); - assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread [" - + threadName - + "] to belong to the cache fetch async thread pool"; - return true; - } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 98bdd0b69d58b..c130a15a7f85c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.index.store.direct; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.CheckedRunnable; @@ -54,6 +56,8 @@ */ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { + private static final Logger logger = LogManager.getLogger(DirectBlobContainerIndexInput.class); + private long position; @Nullable // if not currently reading sequentially @@ -97,14 +101,15 @@ private DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - super(resourceDesc, blobContainer, fileInfo, context, stats, offset, length, bufferSize); + super(logger, resourceDesc, blobContainer, fileInfo, context, stats, offset, length); this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; + setBufferSize(bufferSize); } @Override - protected void readInternal(ByteBuffer b) throws IOException { + protected void doReadInternal(ByteBuffer b) throws IOException { ensureOpen(); if (fileInfo.numberOfParts() == 1) { readInternalBytes(0, position, b, b.remaining()); @@ -319,7 +324,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw } @Override - public void innerClose() throws IOException { + public void doClose() throws IOException { closeStreamForSequentialReads(); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index cc9825b1cf33c..256f4cd13660e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -615,9 +615,14 @@ private void executeTestCase( final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null; + // Passing a wrong checksum here disables the BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo(ByteBuffer) + // optimisation which, if it was enabled, would makes the stats tests much more complicated in order to accommodate for + // potential footer checksum reads. + final String fileChecksum = "_checksum"; + final String blobName = randomUnicodeOfLength(10); final BlobContainer blobContainer = singleBlobContainer(blobName, fileContent); - final StoreFileMetadata metadata = new StoreFileMetadata(fileName, fileContent.length, "_checksum", Version.CURRENT.luceneVersion); + final StoreFileMetadata metadata = new StoreFileMetadata(fileName, fileContent.length, fileChecksum, Version.CURRENT.luceneVersion); final List files = List.of(new FileInfo(blobName, metadata, new ByteSizeValue(fileContent.length))); final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); final Path shardDir = randomShardPath(shardId); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 691a150d11ad6..3235eaab87537 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.fs.FsBlobStore; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.lease.Releasable; @@ -96,7 +97,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; -import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream; import java.nio.file.FileSystem; import java.nio.file.Files; @@ -660,14 +660,17 @@ public void testClearCache() throws Exception { final Path shardSnapshotDir = createTempDir(); for (int i = 0; i < nbRandomFiles; i++) { final String fileName = "file_" + randomAlphaOfLength(10); - final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); + final byte[] input = bytes.v2(); + final String checksum = bytes.v1(); final String blobName = randomAlphaOfLength(15); - Files.write(shardSnapshotDir.resolve(blobName), fileContent, StandardOpenOption.CREATE_NEW); + Files.write(shardSnapshotDir.resolve(blobName), input, StandardOpenOption.CREATE_NEW); randomFiles.add( new BlobStoreIndexShardSnapshot.FileInfo( blobName, - new StoreFileMetadata(fileName, fileContent.length, "_check", Version.CURRENT.luceneVersion), - new ByteSizeValue(fileContent.length) + new StoreFileMetadata(fileName, input.length, checksum, Version.CURRENT.luceneVersion), + new ByteSizeValue(input.length) ) ); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index 81b5a2c2ed3ff..d4d5621d646ed 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.ShardId; @@ -31,7 +32,6 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; @@ -47,6 +47,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -63,11 +64,12 @@ public void testRandomReads() throws Exception { for (int i = 0; i < 5; i++) { final String fileName = randomAlphaOfLength(10); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); - final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); - + final byte[] input = bytes.v2(); + final String checksum = bytes.v1(); final String blobName = randomUnicodeOfLength(10); - final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length, "_na", Version.CURRENT.luceneVersion); + final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length, checksum, Version.CURRENT.luceneVersion); final int partSize = randomBoolean() ? input.length : randomIntBetween(1, input.length); @@ -128,6 +130,7 @@ public void testRandomReads() throws Exception { assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue()); try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) { + assertThat(indexInput, instanceOf(CachedBlobContainerIndexInput.class)); assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); byte[] output = randomReadAndSlice(indexInput, input.length); @@ -176,10 +179,13 @@ public void testThrowsEOFException() throws Exception { ShardId shardId = new ShardId("_name", "_uuid", 0); final String fileName = randomAlphaOfLength(10); - final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); + final byte[] input = bytes.v2(); + final String checksum = bytes.v1(); final String blobName = randomUnicodeOfLength(10); - final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length + 1, "_na", Version.CURRENT.luceneVersion); + final StoreFileMetadata metadata = new StoreFileMetadata(fileName, input.length, checksum, Version.CURRENT.luceneVersion); + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot( snapshotId.getName(), 0L, @@ -226,6 +232,7 @@ public void testThrowsEOFException() throws Exception { assertThat("BlobContainer should be loaded", searchableSnapshotDirectory.blobContainer(), notNullValue()); try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, newIOContext(random()))) { + assertThat(indexInput, instanceOf(CachedBlobContainerIndexInput.class)); final byte[] buffer = new byte[input.length + 1]; final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length)); if (containsEOFException(exception, new HashSet<>()) == false) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java index d943a83c8664f..9976857b3767f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.Environment; @@ -30,7 +31,6 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; @@ -45,12 +45,15 @@ public class FrozenIndexInputTests extends AbstractSearchableSnapshotsTestCase { public void testRandomReads() throws IOException { final String fileName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); - final byte[] fileData = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); + + final byte[] fileData = bytes.v2(); + final String checksum = bytes.v1(); final Path tempDir = createTempDir().resolve(SHARD_ID.getIndex().getUUID()).resolve(String.valueOf(SHARD_ID.getId())); final FileInfo fileInfo = new FileInfo( randomAlphaOfLength(10), - new StoreFileMetadata(fileName, fileData.length, "_na", Version.CURRENT.luceneVersion), + new StoreFileMetadata(fileName, fileData.length, checksum, Version.CURRENT.luceneVersion), new ByteSizeValue(fileData.length) ); @@ -100,7 +103,7 @@ public void testRandomReads() throws IOException { ) { directory.loadSnapshot(createRecoveryState(true), ActionListener.wrap(() -> {})); - // TODO does not test the checksum shortcut, does not test using the recovery range size + // TODO does not test using the recovery range size final IndexInput indexInput = directory.openInput(fileName, newIOContext(random())); assertThat(indexInput, instanceOf(FrozenIndexInput.class)); assertEquals(fileData.length, indexInput.length()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInputTests.java index 0e975decabd7c..26b615a755b3e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInputTests.java @@ -8,43 +8,35 @@ package org.elasticsearch.index.store.checksum; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.ByteBuffersDataOutput; -import org.apache.lucene.store.ByteBuffersIndexOutput; -import org.apache.lucene.store.IndexOutput; -import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.store.Store; import org.elasticsearch.test.ESTestCase; import java.io.EOFException; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; import static org.hamcrest.Matchers.equalTo; public class ChecksumBlobContainerIndexInputTests extends ESTestCase { public void testChecksumBlobContainerIndexInput() throws Exception { - final ByteBuffersDataOutput out = new ByteBuffersDataOutput(); - try (IndexOutput output = new ByteBuffersIndexOutput(out, "tmp", getTestName())) { - CodecUtil.writeHeader(output, getTestName(), 5); - output.writeString(randomRealisticUnicodeOfLengthBetween(1, 50)); - CodecUtil.writeFooter(output); - } - - final byte[] bytes = out.toArrayCopy(); - final long checksum = CodecUtil.checksumEntireFile(new ByteArrayIndexInput(getTestName(), bytes)); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100)); + final long length = bytes.v2().length; + final String checksum = bytes.v1(); final ChecksumBlobContainerIndexInput indexInput = ChecksumBlobContainerIndexInput.create( getTestName(), - bytes.length, - Store.digestToString(checksum), + length, + checksum, Store.READONCE_CHECKSUM ); - assertThat(indexInput.length(), equalTo((long) bytes.length)); + assertThat(indexInput.length(), equalTo(length)); assertThat(indexInput.getFilePointer(), equalTo(0L)); - assertThat(CodecUtil.retrieveChecksum(indexInput), equalTo(checksum)); - assertThat(indexInput.getFilePointer(), equalTo((long) bytes.length)); + assertThat(CodecUtil.retrieveChecksum(indexInput), equalTo(Long.parseLong(checksum, Character.MAX_RADIX))); + assertThat(indexInput.getFilePointer(), equalTo(length)); - expectThrows(EOFException.class, () -> indexInput.readByte()); + expectThrows(EOFException.class, indexInput::readByte); expectThrows(EOFException.class, () -> indexInput.readBytes(new byte[0], 0, 1)); - expectThrows(EOFException.class, () -> indexInput.seek(bytes.length + 1)); + expectThrows(EOFException.class, () -> indexInput.seek(length + 1L)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index 9706b39d64836..e165d44bcb70d 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -7,9 +7,9 @@ package org.elasticsearch.index.store.direct; import org.apache.lucene.store.BufferedIndexInput; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.Version; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -22,9 +22,9 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -41,20 +41,27 @@ public class DirectBlobContainerIndexInputTests extends ESIndexInputTestCase { - private DirectBlobContainerIndexInput createIndexInput(final byte[] input) throws IOException { + private DirectBlobContainerIndexInput createIndexInput(Tuple bytes) throws IOException { + final byte[] input = bytes.v2(); return createIndexInput( input, randomBoolean() ? input.length : randomIntBetween(1, input.length), randomIntBetween(1, 1000), + bytes.v1(), () -> {} ); } - private DirectBlobContainerIndexInput createIndexInput(final byte[] input, long partSize, long minimumReadSize, Runnable onReadBlob) - throws IOException { + private DirectBlobContainerIndexInput createIndexInput( + final byte[] input, + long partSize, + long minimumReadSize, + String checksum, + Runnable onReadBlob + ) throws IOException { final FileInfo fileInfo = new FileInfo( randomAlphaOfLength(5), - new StoreFileMetadata("test", input.length, "_checksum", Version.LATEST), + new StoreFileMetadata("test", input.length, checksum, Version.LATEST), partSize == input.length ? randomFrom( new ByteSizeValue(partSize, ByteSizeUnit.BYTES), @@ -109,7 +116,7 @@ public int read(byte[] b, int off, int len) throws IOException { }; } }); - return new DirectBlobContainerIndexInput( + final DirectBlobContainerIndexInput indexInput = new DirectBlobContainerIndexInput( blobContainer, fileInfo, newIOContext(random()), @@ -117,12 +124,16 @@ public int read(byte[] b, int off, int len) throws IOException { minimumReadSize, randomBoolean() ? BufferedIndexInput.BUFFER_SIZE : between(BufferedIndexInput.MIN_BUFFER_SIZE, BufferedIndexInput.BUFFER_SIZE) ); + assertEquals(input.length, indexInput.length()); + return indexInput; } public void testRandomReads() throws IOException { for (int i = 0; i < 100; i++) { - byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); - IndexInput indexInput = createIndexInput(input); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); + final byte[] input = bytes.v2(); + + final DirectBlobContainerIndexInput indexInput = createIndexInput(bytes); assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); byte[] output = randomReadAndSlice(indexInput, input.length); @@ -132,8 +143,10 @@ public void testRandomReads() throws IOException { public void testRandomOverflow() throws IOException { for (int i = 0; i < 100; i++) { - byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); - IndexInput indexInput = createIndexInput(input); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); + final byte[] input = bytes.v2(); + + final DirectBlobContainerIndexInput indexInput = createIndexInput(bytes); int firstReadLen = randomIntBetween(0, input.length - 1); randomReadAndSlice(indexInput, firstReadLen); int bytesLeft = input.length - firstReadLen; @@ -144,8 +157,10 @@ public void testRandomOverflow() throws IOException { public void testSeekOverflow() throws IOException { for (int i = 0; i < 100; i++) { - byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); - IndexInput indexInput = createIndexInput(input); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); + final byte[] input = bytes.v2(); + + final DirectBlobContainerIndexInput indexInput = createIndexInput(bytes); int firstReadLen = randomIntBetween(0, input.length - 1); randomReadAndSlice(indexInput, firstReadLen); expectThrows(IOException.class, () -> { @@ -167,12 +182,21 @@ public void testSeekOverflow() throws IOException { public void testSequentialReadsShareInputStreamFromBlobStore() throws IOException { for (int i = 0; i < 100; i++) { - final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(StandardCharsets.UTF_8); + final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); + final byte[] input = bytes.v2(); + final int minimumReadSize = randomIntBetween(1, 1000); final int partSize = randomBoolean() ? input.length : randomIntBetween(1, input.length); + final String checksum = bytes.v1(); final AtomicInteger readBlobCount = new AtomicInteger(); - final BufferedIndexInput indexInput = createIndexInput(input, partSize, minimumReadSize, readBlobCount::incrementAndGet); + final BufferedIndexInput indexInput = createIndexInput( + input, + partSize, + minimumReadSize, + checksum, + readBlobCount::incrementAndGet + ); assertEquals(input.length, indexInput.length()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 6044db3f31dab..4cd8537a78d64 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -7,6 +7,12 @@ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -16,6 +22,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -26,6 +33,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.cache.CacheFile; import org.elasticsearch.index.store.cache.CacheKey; import org.elasticsearch.indices.recovery.RecoveryState; @@ -48,6 +56,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -59,6 +68,7 @@ import java.util.SortedSet; import java.util.concurrent.TimeUnit; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTestCase { @@ -305,4 +315,22 @@ protected List randomCacheFiles(CacheService cacheService) throws Exc } return cacheFiles; } + + public static Tuple randomChecksumBytes(int length) throws IOException { + return randomChecksumBytes(randomUnicodeOfLength(length).getBytes(StandardCharsets.UTF_8)); + } + + public static Tuple randomChecksumBytes(byte[] bytes) throws IOException { + final ByteBuffersDataOutput out = new ByteBuffersDataOutput(); + try (IndexOutput output = new ByteBuffersIndexOutput(out, "randomChecksumBytes()", "randomChecksumBytes()")) { + CodecUtil.writeHeader(output, randomAsciiLettersOfLengthBetween(0, 127), randomNonNegativeByte()); + output.writeBytes(bytes, bytes.length); + CodecUtil.writeFooter(output); + } + final String checksum; + try (IndexInput input = new ByteBuffersIndexInput(out.toDataInput(), "checksumEntireFile()")) { + checksum = Store.digestToString(CodecUtil.checksumEntireFile(input)); + } + return Tuple.tuple(checksum, out.toArrayCopy()); + } }