From 2f8802919f282b08213f05af0294601a5b071d2a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 8 Feb 2021 16:09:01 +0100 Subject: [PATCH] Use Custom Class instead of Tuple for Cached Byte Positions (#68662) Using `Tuple` all over is somewhat read to read and forces us to duplicate a lot of assertions. Using a custom class dries up the assertions and makes the code easier to follow by adding utilities for some common region math spots. --- .../index/store/cache/CacheFile.java | 28 ++-- .../cache/CachedBlobContainerIndexInput.java | 98 ++++++-------- .../index/store/cache/FrozenIndexInput.java | 39 +++--- .../index/store/cache/SparseFileTracker.java | 123 +++++++++--------- .../searchablesnapshots/cache/ByteRange.java | 94 +++++++++++++ .../cache/CacheService.java | 5 +- .../cache/FrozenCacheService.java | 62 ++++----- .../cache/PersistentCache.java | 32 +++-- .../index/store/cache/CacheFileTests.java | 32 ++--- .../store/cache/SparseFileTrackerTests.java | 103 +++++++-------- .../index/store/cache/TestUtils.java | 40 +++--- .../AbstractSearchableSnapshotsTestCase.java | 4 +- .../cache/CacheServiceTests.java | 10 +- .../cache/FrozenCacheServiceTests.java | 5 +- .../cache/PersistentCacheTests.java | 5 +- 15 files changed, 373 insertions(+), 307 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index f7e22a5c7538d..14d90ce798945 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -10,12 +10,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.IOException; import java.io.UncheckedIOException; @@ -138,7 +138,7 @@ public CacheFile(CacheKey cacheKey, long length, Path file, ModificationListener this(cacheKey, new SparseFileTracker(file.toString(), length), file, listener); } - public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet> ranges, ModificationListener listener) { + public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet ranges, ModificationListener listener) { this(cacheKey, new SparseFileTracker(file.toString(), length, ranges), file, listener); } @@ -170,7 +170,7 @@ FileChannel getChannel() { } // Only used in tests - SortedSet> getCompletedRanges() { + SortedSet getCompletedRanges() { return tracker.getCompletedRanges(); } @@ -343,8 +343,8 @@ interface RangeMissingHandler { * @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed. */ Future populateAndRead( - final Tuple rangeToWrite, - final Tuple rangeToRead, + final ByteRange rangeToWrite, + final ByteRange rangeToRead, final RangeAvailableHandler reader, final RangeMissingHandler writer, final Executor executor @@ -401,7 +401,7 @@ public void onFailure(Exception e) { * target range is neither available nor pending. */ @Nullable - Future readIfAvailableOrPending(final Tuple rangeToRead, final RangeAvailableHandler reader) { + Future readIfAvailableOrPending(final ByteRange rangeToRead, final RangeAvailableHandler reader) { final PlainActionFuture future = PlainActionFuture.newFuture(); Releasable decrementRef = null; try { @@ -429,7 +429,7 @@ private static void releaseAndFail(PlainActionFuture future, Releasable } private static ActionListener rangeListener( - Tuple rangeToRead, + ByteRange rangeToRead, RangeAvailableHandler reader, PlainActionFuture future, FileChannelReference reference, @@ -437,12 +437,12 @@ private static ActionListener rangeListener( ) { return ActionListener.runAfter(ActionListener.wrap(success -> { final int read = reader.onRangeAvailable(reference.fileChannel); - assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read [" + assert read == rangeToRead.length() : "partial read [" + read + "] does not match the range to read [" - + rangeToRead.v2() + + rangeToRead.end() + '-' - + rangeToRead.v1() + + rangeToRead.start() + ']'; future.onResponse(read); }, future::onFailure), releasable::close); @@ -466,9 +466,9 @@ private FileChannelReference acquireFileChannelReference() { return reference; } - public Tuple getAbsentRangeWithin(long start, long end) { + public ByteRange getAbsentRangeWithin(ByteRange range) { ensureOpen(); - return tracker.getAbsentRangeWithin(start, end); + return tracker.getAbsentRangeWithin(range); } // used in tests @@ -497,7 +497,7 @@ private void markAsNeedsFSync() { * @throws IOException if the cache file failed to be fsync * @throws java.nio.file.NoSuchFileException if the cache file does not exist */ - public SortedSet> fsync() throws IOException { + public SortedSet fsync() throws IOException { if (refCounter.tryIncRef()) { try { if (needsFsync.compareAndSet(true, false)) { @@ -506,7 +506,7 @@ public SortedSet> fsync() throws IOException { // Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as // persisted on disk by the caller of this method, even if they are fully written to disk at the time the file // fsync is effectively executed - final SortedSet> completedRanges = tracker.getCompletedRanges(); + final SortedSet completedRanges = tracker.getCompletedRanges(); assert completedRanges != null; assert completedRanges.isEmpty() == false; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 6152064e59b1c..ffe5615819b4b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.EOFException; import java.io.IOException; @@ -134,11 +135,11 @@ private long getDefaultRangeSize() { : fileInfo.partSize().getBytes(); } - private Tuple computeRange(long position) { + private ByteRange computeRange(long position) { final long rangeSize = getDefaultRangeSize(); long start = (position / rangeSize) * rangeSize; long end = Math.min(start + rangeSize, fileInfo.length()); - return Tuple.tuple(start, end); + return ByteRange.of(start, end); } @Override @@ -165,8 +166,8 @@ protected void readInternal(ByteBuffer b) throws IOException { // Can we serve the read directly from disk? If so, do so and don't worry about anything else. - final Future waitingForRead = cacheFile.readIfAvailableOrPending(Tuple.tuple(position, position + length), channel -> { - final int read = readCacheFile(channel, position, b); + final Future waitingForRead = cacheFile.readIfAvailableOrPending(ByteRange.of(position, position + length), chan -> { + final int read = readCacheFile(chan, position, b); assert read == length : read + " vs " + length; return read; }); @@ -180,7 +181,7 @@ protected void readInternal(ByteBuffer b) throws IOException { // Requested data is not on disk, so try the cache index next. - final Tuple indexCacheMiss; // null if not a miss + final ByteRange indexCacheMiss; // null if not a miss // We try to use the cache index if: // - the file is small enough to be fully cached @@ -198,10 +199,10 @@ protected void readInternal(ByteBuffer b) throws IOException { if (canBeFullyCached) { // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = Tuple.tuple(0L, fileInfo.length()); + indexCacheMiss = ByteRange.of(0L, fileInfo.length()); } else { // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); + indexCacheMiss = ByteRange.of(0L, BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); } // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. @@ -223,7 +224,7 @@ protected void readInternal(ByteBuffer b) throws IOException { assert b.position() == length : "copied " + b.position() + " but expected " + length; try { - final Tuple cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to()); + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); cacheFile.populateAndRead( cachedRange, cachedRange, @@ -274,23 +275,14 @@ protected void readInternal(ByteBuffer b) throws IOException { // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any // miss in the cache index. - final Tuple startRangeToWrite = computeRange(position); - final Tuple endRangeToWrite = computeRange(position + length - 1); - assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite; - final Tuple rangeToWrite = Tuple.tuple( - Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()), - Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2()) - ); - - assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "[" - + position - + "-" - + (position + length) - + "] vs " - + rangeToWrite; + final ByteRange startRangeToWrite = computeRange(position); + final ByteRange endRangeToWrite = computeRange(position + length - 1); + assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite; + final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss); - final Tuple rangeToRead = Tuple.tuple(position, position + length); - assert rangeToRead.v2() - rangeToRead.v1() == b.remaining() : b.remaining() + " vs " + rangeToRead; + final ByteRange rangeToRead = ByteRange.of(position, position + length); + assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite; + assert rangeToRead.length() == b.remaining() : b.remaining() + " vs " + rangeToRead; final Future populateCacheFuture = cacheFile.populateAndRead( rangeToWrite, @@ -303,7 +295,7 @@ protected void readInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { final Releasable onCacheFillComplete = stats.addIndexCacheFill(); final Future readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { - final int indexCacheMissLength = toIntBytes(indexCacheMiss.v2() - indexCacheMiss.v1()); + final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); // We assume that we only cache small portions of blobs so that we do not need to: // - use a BigArrays for allocation @@ -312,11 +304,11 @@ protected void readInternal(ByteBuffer b) throws IOException { assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); - Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer); + Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer); // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats byteBuffer.flip(); final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); - directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener() { + directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener() { @Override public void onResponse(Void response) { onCacheFillComplete.close(); @@ -438,56 +430,52 @@ public Tuple prefetchPart(final int part) throws IOException { if (part >= fileInfo.numberOfParts()) { throw new IllegalArgumentException("Unexpected part number [" + part + "]"); } - final Tuple partRange = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum()); + final ByteRange partRange = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum()); assert assertRangeIsAlignedWithPart(partRange); try { final CacheFile cacheFile = cacheFileReference.get(); - final Tuple range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2()); + final ByteRange range = cacheFile.getAbsentRangeWithin(partRange); if (range == null) { logger.trace( "prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]", part, - partRange.v1(), - partRange.v2(), + partRange.start(), + partRange.end(), cacheFileReference ); return Tuple.tuple(cacheFile.getInitialLength(), 0L); } - final long rangeStart = range.v1(); - final long rangeEnd = range.v2(); - final long rangeLength = rangeEnd - rangeStart; - logger.trace( "prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]", part, - partRange.v1(), - partRange.v2(), - rangeStart, - rangeEnd, + partRange.start(), + partRange.end(), + range.start(), + range.end(), cacheFileReference ); - final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))]; + final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, range.length()))]; long totalBytesRead = 0L; final AtomicLong totalBytesWritten = new AtomicLong(); - long remainingBytes = rangeEnd - rangeStart; + long remainingBytes = range.length(); final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) { + try (InputStream input = openInputStreamFromBlobStore(range.start(), range.length())) { while (remainingBytes > 0L) { - assert totalBytesRead + remainingBytes == rangeLength; - final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference); + assert totalBytesRead + remainingBytes == range.length(); + final int bytesRead = readSafe(input, copyBuffer, range.start(), range.end(), remainingBytes, cacheFileReference); // The range to prewarm in cache - final long readStart = rangeStart + totalBytesRead; - final Tuple rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead); + final long readStart = range.start() + totalBytesRead; + final ByteRange rangeToWrite = ByteRange.of(readStart, readStart + bytesRead); // We do not actually read anything, but we want to wait for the write to complete before proceeding. // noinspection UnnecessaryLocalVariable - final Tuple rangeToRead = rangeToWrite; + final ByteRange rangeToRead = rangeToWrite; cacheFile.populateAndRead(rangeToWrite, rangeToRead, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> { final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, toIntBytes(start - readStart), toIntBytes(end - start)); final int writtenBytes = positionalWrite(channel, start, byteBuffer); @@ -507,8 +495,8 @@ public Tuple prefetchPart(final int part) throws IOException { final long endTimeNanos = stats.currentTimeNanos(); stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos); } - assert totalBytesRead == rangeLength; - return Tuple.tuple(cacheFile.getInitialLength(), rangeLength); + assert totalBytesRead == range.length(); + return Tuple.tuple(cacheFile.getInitialLength(), range.length()); } catch (final Exception e) { throw new IOException("Failed to prefetch file part in cache", e); } @@ -555,16 +543,16 @@ private static int readSafe( /** * Asserts that the range of bytes to warm in cache is aligned with {@link #fileInfo}'s part size. */ - private boolean assertRangeIsAlignedWithPart(Tuple range) { + private boolean assertRangeIsAlignedWithPart(ByteRange range) { if (fileInfo.numberOfParts() == 1L) { final long length = fileInfo.length(); - assert range.v1() == 0L : "start of range [" + range.v1() + "] is not aligned with zero"; - assert range.v2() == length : "end of range [" + range.v2() + "] is not aligned with file length [" + length + ']'; + assert range.start() == 0L : "start of range [" + range.start() + "] is not aligned with zero"; + assert range.end() == length : "end of range [" + range.end() + "] is not aligned with file length [" + length + ']'; } else { final long length = fileInfo.partSize().getBytes(); - assert range.v1() % length == 0L : "start of range [" + range.v1() + "] is not aligned with part start"; - assert range.v2() % length == 0L || (range.v2() == fileInfo.length()) : "end of range [" - + range.v2() + assert range.start() % length == 0L : "start of range [" + range.start() + "] is not aligned with part start"; + assert range.end() % length == 0L || (range.end() == fileInfo.length()) : "end of range [" + + range.end() + "] is not aligned with part end or with file length"; } return true; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index d596a6285e16a..345abcefe8094 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -21,7 +21,6 @@ import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -29,6 +28,7 @@ import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService.FrozenCacheFile; import org.elasticsearch.xpack.searchablesnapshots.cache.SharedBytes; @@ -125,11 +125,11 @@ private long getDefaultRangeSize() { : fileInfo.partSize().getBytes(); } - private Tuple computeRange(long position) { + private ByteRange computeRange(long position) { final long rangeSize = getDefaultRangeSize(); long start = (position / rangeSize) * rangeSize; long end = Math.min(start + rangeSize, fileInfo.length()); - return Tuple.tuple(start, end); + return ByteRange.of(start, end); } @Override @@ -169,7 +169,7 @@ protected void readInternal(ByteBuffer b) throws IOException { // Can we serve the read directly from disk? If so, do so and don't worry about anything else. final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( - Tuple.tuple(position, position + length), + ByteRange.of(position, position + length), (channel, pos, relativePos, len) -> { final int read = readCacheFile(channel, pos, relativePos, len, b, position, true, luceneByteBufLock, stopAsyncReads); assert read <= length : read + " vs " + length; @@ -189,7 +189,7 @@ protected void readInternal(ByteBuffer b) throws IOException { // Requested data is not on disk, so try the cache index next. - final Tuple indexCacheMiss; // null if not a miss + final ByteRange indexCacheMiss; // null if not a miss // We try to use the cache index if: // - the file is small enough to be fully cached @@ -207,10 +207,10 @@ protected void readInternal(ByteBuffer b) throws IOException { if (canBeFullyCached) { // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = Tuple.tuple(0L, fileInfo.length()); + indexCacheMiss = ByteRange.of(0L, fileInfo.length()); } else { // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); + indexCacheMiss = ByteRange.of(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); } // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. @@ -236,7 +236,7 @@ protected void readInternal(ByteBuffer b) throws IOException { assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; try { - final Tuple cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to()); + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); frozenCacheFile.populateAndRead( cachedRange, cachedRange, @@ -297,21 +297,18 @@ protected void readInternal(ByteBuffer b) throws IOException { // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any // miss in the cache index. - final Tuple startRangeToWrite = computeRange(position); - final Tuple endRangeToWrite = computeRange(position + length - 1); - assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite; - final Tuple rangeToWrite = Tuple.tuple( - Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()), - Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2()) - ); + final ByteRange startRangeToWrite = computeRange(position); + final ByteRange endRangeToWrite = computeRange(position + length - 1); + assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite; + final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss); - assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "[" + assert rangeToWrite.start() <= position && position + length <= rangeToWrite.end() : "[" + position + "-" + (position + length) + "] vs " + rangeToWrite; - final Tuple rangeToRead = Tuple.tuple(position, position + length); + final ByteRange rangeToRead = ByteRange.of(position, position + length); final StepListener populateCacheFuture = frozenCacheFile.populateAndRead( rangeToWrite, @@ -322,7 +319,7 @@ protected void readInternal(ByteBuffer b) throws IOException { relativePos, len, b, - rangeToRead.v1(), + rangeToRead.start(), false, luceneByteBufLock, stopAsyncReads @@ -332,7 +329,7 @@ protected void readInternal(ByteBuffer b) throws IOException { channelPos, relativePos, len, - rangeToWrite.v1(), + rangeToWrite.start(), progressUpdater ), directory.cacheFetchAsyncExecutor() @@ -340,7 +337,7 @@ protected void readInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { final Releasable onCacheFillComplete = stats.addIndexCacheFill(); - final int indexCacheMissLength = toIntBytes(indexCacheMiss.v2() - indexCacheMiss.v1()); + final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); // We assume that we only cache small portions of blobs so that we do not need to: // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks @@ -387,7 +384,7 @@ protected void readInternal(ByteBuffer b) throws IOException { byteBuffer.position(read); // mark all bytes as accounted for byteBuffer.flip(); final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); - directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener() { + directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener() { @Override public void onResponse(Void response) { onCacheFillComplete.close(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java index 6ec0a44eb910b..1a80bb7bb7d27 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java @@ -10,7 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.util.ArrayList; import java.util.Collections; @@ -61,7 +61,7 @@ public SparseFileTracker(String description, long length) { * @param length the length of the file tracked by the sparse file tracker * @param ranges the set of ranges to be considered present */ - public SparseFileTracker(String description, long length, SortedSet> ranges) { + public SparseFileTracker(String description, long length, SortedSet ranges) { this.description = description; this.length = length; if (length < 0) { @@ -71,14 +71,14 @@ public SparseFileTracker(String description, long length, SortedSet next : ranges) { - final Range range = new Range(next.v1(), next.v2(), null); - if (range.end <= range.start) { - throw new IllegalArgumentException("Range " + range + " cannot be empty"); + for (ByteRange next : ranges) { + if (next.length() == 0L) { + throw new IllegalArgumentException("Range " + next + " cannot be empty"); } - if (length < range.end) { - throw new IllegalArgumentException("Range " + range + " is exceeding maximum length [" + length + ']'); + if (length < next.end()) { + throw new IllegalArgumentException("Range " + next + " is exceeding maximum length [" + length + ']'); } + final Range range = new Range(next); if (previous != null && range.start <= previous.end) { throw new IllegalArgumentException("Range " + range + " is overlapping a previous range " + previous); } @@ -97,8 +97,8 @@ public long getLength() { return length; } - public SortedSet> getCompletedRanges() { - SortedSet> completedRanges = null; + public SortedSet getCompletedRanges() { + SortedSet completedRanges = null; synchronized (mutex) { assert invariant(); for (Range range : ranges) { @@ -106,9 +106,9 @@ public SortedSet> getCompletedRanges() { continue; } if (completedRanges == null) { - completedRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + completedRanges = new TreeSet<>(); } - completedRanges.add(Tuple.tuple(range.start, range.end)); + completedRanges.add(ByteRange.of(range.start, range.end)); } } return completedRanges == null ? Collections.emptySortedSet() : completedRanges; @@ -136,34 +136,30 @@ private long computeLengthOfRanges() { * range from the file is defined by {@code range} but the listener is executed as soon as a (potentially smaller) sub range * {@code subRange} becomes available. * - * @param range A tuple that contains the (inclusive) start and (exclusive) end of the desired range - * @param subRange A tuple that contains the (inclusive) start and (exclusive) end of the listener's range + * @param range A ByteRange that contains the (inclusive) start and (exclusive) end of the desired range + * @param subRange A ByteRange that contains the (inclusive) start and (exclusive) end of the listener's range * @param listener Listener for when the listening range is fully available * @return A collection of gaps that the client should fill in to satisfy this range * @throws IllegalArgumentException if invalid range is requested */ - public List waitForRange(final Tuple range, final Tuple subRange, final ActionListener listener) { - final long start = range.v1(); - final long end = range.v2(); - if (end < start || start < 0L || length < end) { - throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]"); + public List waitForRange(final ByteRange range, final ByteRange subRange, final ActionListener listener) { + if (length < range.end()) { + throw new IllegalArgumentException("invalid range [" + range + ", length=" + length + "]"); } - if (subRange.v2() < subRange.v1() || subRange.v1() < 0L || length < subRange.v2()) { - throw new IllegalArgumentException( - "invalid range to listen to [start=" + subRange.v1() + ", end=" + subRange.v2() + ", length=" + length + "]" - ); + if (length < subRange.end()) { + throw new IllegalArgumentException("invalid range to listen to [" + subRange + ", length=" + length + "]"); } - if (subRange.v1() < start || end < subRange.v2()) { + if (subRange.isSubRangeOf(range) == false) { throw new IllegalArgumentException( "unable to listen to range [start=" - + subRange.v1() + + subRange.start() + ", end=" - + subRange.v2() + + subRange.end() + "] when range is [start=" - + start + + range.start() + ", end=" - + end + + range.end() + ", length=" + length + "]" @@ -179,19 +175,19 @@ public List waitForRange(final Tuple range, final Tuple pendingRanges = new ArrayList<>(); - final Range targetRange = new Range(start, end, null); + final Range targetRange = new Range(range); final SortedSet earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts if (earlierRanges.isEmpty() == false) { final Range lastEarlierRange = earlierRanges.last(); - if (start < lastEarlierRange.end) { + if (range.start() < lastEarlierRange.end) { if (lastEarlierRange.isPending()) { pendingRanges.add(lastEarlierRange); } - targetRange.start = Math.min(end, lastEarlierRange.end); + targetRange.start = Math.min(range.end(), lastEarlierRange.end); } } - while (targetRange.start < end) { + while (targetRange.start < range.end()) { assert 0 <= targetRange.start : targetRange; assert invariant(); @@ -199,13 +195,13 @@ public List waitForRange(final Tuple range, final Tuple waitForRange(final Tuple range, final Tuple waitForRange(final Tuple range, final Tuple waitForRange(final Tuple range, final Tuple pendingRange.start < subRange.v2()) - .filter(pendingRange -> subRange.v1() < pendingRange.end) + .filter(pendingRange -> pendingRange.start < subRange.end()) + .filter(pendingRange -> subRange.start() < pendingRange.end) .sorted(Comparator.comparingLong(r -> r.start)) .collect(Collectors.toList()); } @@ -259,7 +255,7 @@ public List waitForRange(final Tuple range, final Tuple null), - Math.min(requiredRange.completionListener.end, subRange.v2()) + Math.min(requiredRange.completionListener.end, subRange.end()) ); break; default: @@ -268,7 +264,7 @@ public List waitForRange(final Tuple range, final Tuple r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, subRange.v2())) + r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, subRange.end())) ); } @@ -277,7 +273,8 @@ public List waitForRange(final Tuple range, final Tuple waitForRange(final Tuple range, final Tuple range, final ActionListener listener) { - final long start = range.v1(); - final long end = range.v2(); - if (end < start || start < 0L || length < end) { - throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]"); + public boolean waitForRangeIfPending(final ByteRange range, final ActionListener listener) { + if (length < range.end()) { + throw new IllegalArgumentException("invalid range [" + range + ", length=" + length + "]"); } final ActionListener wrappedListener = wrapWithAssertions(listener); @@ -299,19 +294,19 @@ public boolean waitForRangeIfPending(final Tuple range, final Action synchronized (mutex) { assert invariant(); - final Range targetRange = new Range(start, end, null); + final Range targetRange = new Range(range); final SortedSet earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts if (earlierRanges.isEmpty() == false) { final Range lastEarlierRange = earlierRanges.last(); - if (start < lastEarlierRange.end) { + if (range.start() < lastEarlierRange.end) { if (lastEarlierRange.isPending()) { pendingRanges.add(lastEarlierRange); } - targetRange.start = Math.min(end, lastEarlierRange.end); + targetRange.start = Math.min(range.end(), lastEarlierRange.end); } } - while (targetRange.start < end) { + while (targetRange.start < range.end()) { assert 0 <= targetRange.start : targetRange; assert invariant(); @@ -326,14 +321,14 @@ public boolean waitForRangeIfPending(final Tuple range, final Action if (firstExistingRange.isPending()) { pendingRanges.add(firstExistingRange); } - targetRange.start = Math.min(end, firstExistingRange.end); + targetRange.start = Math.min(range.end(), firstExistingRange.end); } else { return false; } } } assert targetRange.start == targetRange.end : targetRange; - assert targetRange.start == end : targetRange; + assert targetRange.start == range.end() : targetRange; assert invariant(); } @@ -348,7 +343,7 @@ public boolean waitForRangeIfPending(final Tuple range, final Action final Range pendingRange = pendingRanges.get(0); pendingRange.completionListener.addListener( wrappedListener.map(progress -> null), - Math.min(pendingRange.completionListener.end, end) + Math.min(pendingRange.completionListener.end, range.end()) ); return true; default: @@ -357,7 +352,7 @@ public boolean waitForRangeIfPending(final Tuple range, final Action pendingRanges.size() ); pendingRanges.forEach( - r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, end)) + r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, range.end())) ); return true; } @@ -380,13 +375,14 @@ private ActionListener wrapWithAssertions(ActionListener listener) { * some ranges of present bytes. It tries to return the smallest possible range, but does so on a best-effort basis. This method does * not acquire anything, which means that another thread may concurrently fill in some of the returned range. * - * @param start The (inclusive) start of the target range - * @param end The (exclusive) end of the target range + * @param range The target range * @return a range that contains all bytes of the target range which are absent, or {@code null} if there are no such bytes. */ - public Tuple getAbsentRangeWithin(final long start, final long end) { + @Nullable + public ByteRange getAbsentRangeWithin(ByteRange range) { synchronized (mutex) { + final long start = range.start(); // Find the first absent byte in the range final SortedSet startRanges = ranges.headSet(new Range(start, start, null), true); // ranges which start <= 'start' long resultStart; @@ -405,6 +401,7 @@ public Tuple getAbsentRangeWithin(final long start, final long end) } assert resultStart >= start; + final long end = range.end(); // Find the last absent byte in the range final SortedSet endRanges = ranges.headSet(new Range(end, end, null), false); // ranges which start < 'end' final long resultEnd; @@ -423,7 +420,7 @@ public Tuple getAbsentRangeWithin(final long start, final long end) } assert resultEnd <= end; - return resultStart < resultEnd ? Tuple.tuple(resultStart, resultEnd) : null; + return resultStart < resultEnd ? ByteRange.of(resultStart, resultEnd) : null; } } @@ -601,6 +598,10 @@ private static class Range { @Nullable // if not pending final ProgressListenableActionFuture completionListener; + Range(ByteRange range) { + this(range.start(), range.end(), null); + } + Range(long start, long end, @Nullable ProgressListenableActionFuture completionListener) { assert start <= end : start + "-" + end; this.start = start; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java new file mode 100644 index 0000000000000..96e1725b3a0c6 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.common.Nullable; + +public final class ByteRange implements Comparable { + + public static final ByteRange EMPTY = new ByteRange(0L, 0L); + + private final long start; + + private final long end; + + public static ByteRange of(long start, long end) { + return new ByteRange(start, end); + } + + private ByteRange(long start, long end) { + this.start = start; + this.end = end; + assert start >= 0L : "Start must be >= 0 but saw [" + start + "]"; + assert end >= start : "End must be greater or equal to start but saw [" + start + "][" + start + "]"; + } + + /** + * Computes the smallest range that contains both this instance as well as the given {@code other} range. + * + * @param other other range or {@code null} in which case this instance is returned + */ + public ByteRange minEnvelope(@Nullable ByteRange other) { + if (other == null) { + return this; + } + if (other.isSubRangeOf(this)) { + return this; + } + if (this.isSubRangeOf(other)) { + return other; + } + return of(Math.min(start, other.start), Math.max(end, other.end)); + } + + public long start() { + return start; + } + + public long end() { + return end; + } + + public long length() { + return end - start; + } + + /** + * Checks if this instance is fully contained in the given {@code range}. + */ + public boolean isSubRangeOf(ByteRange range) { + return start >= range.start() && end <= range.end(); + } + + @Override + public int hashCode() { + return 31 * Long.hashCode(start) + Long.hashCode(end); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof ByteRange == false) { + return false; + } + final ByteRange that = (ByteRange) obj; + return start == that.start && end == that.end; + } + + @Override + public String toString() { + return "ByteRange{" + start + "}{" + end + "}"; + } + + @Override + public int compareTo(ByteRange o) { + return Long.compare(start, o.start); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index d2e196f8d05f6..ce1b31d30bf46 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.settings.ClusterSettings; @@ -325,7 +324,7 @@ void put( final long fileLength, final Path cacheDir, final String cacheFileUuid, - final SortedSet> cacheFileRanges + final SortedSet cacheFileRanges ) throws Exception { ensureLifecycleInitializing(); @@ -575,7 +574,7 @@ public void synchronizeCache() { break; case NEEDS_FSYNC: - final SortedSet> ranges = cacheFile.fsync(); + final SortedSet ranges = cacheFile.fsync(); logger.trace( "cache file [{}] synchronized with [{}] completed range(s)", cacheFile.getFile().getFileName(), diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java index b379c575020a7..b2740618d1bf6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Setting; @@ -197,18 +196,21 @@ private int getEndingRegion(long position) { return getRegion(position); } - private Tuple mapSubRangeToRegion(Tuple range, int region) { + private ByteRange mapSubRangeToRegion(ByteRange range, int region) { final long regionStart = getRegionStart(region); final long regionEnd = getRegionEnd(region); - if (range.v1() >= regionEnd || range.v2() <= regionStart) { - return Tuple.tuple(0L, 0L); + if (range.start() >= regionEnd || range.end() <= regionStart) { + return ByteRange.EMPTY; } - final long rangeStart = Math.max(regionStart, range.v1()); - final long rangeEnd = Math.min(regionEnd, range.v2()); + final long rangeStart = Math.max(regionStart, range.start()); + final long rangeEnd = Math.min(regionEnd, range.end()); if (rangeStart >= rangeEnd) { - return Tuple.tuple(0L, 0L); + return ByteRange.EMPTY; } - return Tuple.tuple(getRegionRelativePosition(rangeStart), rangeEnd == regionEnd ? regionSize : getRegionRelativePosition(rangeEnd)); + return ByteRange.of( + getRegionRelativePosition(rangeStart), + rangeEnd == regionEnd ? regionSize : getRegionRelativePosition(rangeEnd) + ); } private long getRegionSize(long fileLength, int region) { @@ -581,8 +583,8 @@ private void throwAlreadyEvicted() { } public StepListener populateAndRead( - final Tuple rangeToWrite, - final Tuple rangeToRead, + final ByteRange rangeToWrite, + final ByteRange rangeToRead, final RangeAvailableHandler reader, final RangeMissingHandler writer, final Executor executor @@ -599,7 +601,7 @@ public StepListener populateAndRead( final SharedBytes.IO fileChannel = sharedBytes.getFileChannel(sharedBytesPos); listener.whenComplete(integer -> fileChannel.decRef(), e -> fileChannel.decRef()); final ActionListener rangeListener = rangeListener(rangeToRead, reader, listener, fileChannel); - if (rangeToRead.v1().equals(rangeToRead.v2())) { + if (rangeToRead.length() == 0L) { // nothing to read, skip rangeListener.onResponse(null); return listener; @@ -645,7 +647,7 @@ public void onFailure(Exception e) { } @Nullable - public StepListener readIfAvailableOrPending(final Tuple rangeToRead, final RangeAvailableHandler reader) { + public StepListener readIfAvailableOrPending(final ByteRange rangeToRead, final RangeAvailableHandler reader) { final StepListener listener = new StepListener<>(); Releasable decrementRef = null; try { @@ -670,7 +672,7 @@ public StepListener readIfAvailableOrPending(final Tuple ra } private ActionListener rangeListener( - Tuple rangeToRead, + ByteRange rangeToRead, RangeAvailableHandler reader, ActionListener listener, SharedBytes.IO fileChannel @@ -680,16 +682,16 @@ private ActionListener rangeListener( assert regionOwners[sharedBytesPos].get() == CacheFileRegion.this; final int read = reader.onRangeAvailable( fileChannel, - physicalStartOffset + rangeToRead.v1(), - rangeToRead.v1(), - rangeToRead.v2() - rangeToRead.v1() + physicalStartOffset + rangeToRead.start(), + rangeToRead.start(), + rangeToRead.length() ); - assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read [" + assert read == rangeToRead.length() : "partial read [" + read + "] does not match the range to read [" - + rangeToRead.v2() + + rangeToRead.end() + '-' - + rangeToRead.v1() + + rangeToRead.start() + ']'; listener.onResponse(read); }, listener::onFailure); @@ -721,19 +723,19 @@ public FrozenCacheFile(CacheKey cacheKey, long length) { } public StepListener populateAndRead( - final Tuple rangeToWrite, - final Tuple rangeToRead, + final ByteRange rangeToWrite, + final ByteRange rangeToRead, final RangeAvailableHandler reader, final RangeMissingHandler writer, final Executor executor ) { StepListener stepListener = null; - final long writeStart = rangeToWrite.v1(); - final long readStart = rangeToRead.v1(); - for (int i = getRegion(rangeToWrite.v1()); i <= getEndingRegion(rangeToWrite.v2()); i++) { + final long writeStart = rangeToWrite.start(); + final long readStart = rangeToRead.start(); + for (int i = getRegion(rangeToWrite.start()); i <= getEndingRegion(rangeToWrite.end()); i++) { final int region = i; - final Tuple subRangeToWrite = mapSubRangeToRegion(rangeToWrite, region); - final Tuple subRangeToRead = mapSubRangeToRegion(rangeToRead, region); + final ByteRange subRangeToWrite = mapSubRangeToRegion(rangeToWrite, region); + final ByteRange subRangeToRead = mapSubRangeToRegion(rangeToRead, region); final CacheFileRegion fileRegion = get(cacheKey, length, region); final StepListener lis = fileRegion.populateAndRead( subRangeToWrite, @@ -768,12 +770,12 @@ public StepListener populateAndRead( } @Nullable - public StepListener readIfAvailableOrPending(final Tuple rangeToRead, final RangeAvailableHandler reader) { + public StepListener readIfAvailableOrPending(final ByteRange rangeToRead, final RangeAvailableHandler reader) { StepListener stepListener = null; - final long start = rangeToRead.v1(); - for (int i = getRegion(rangeToRead.v1()); i <= getEndingRegion(rangeToRead.v2()); i++) { + final long start = rangeToRead.start(); + for (int i = getRegion(rangeToRead.start()); i <= getEndingRegion(rangeToRead.end()); i++) { final int region = i; - final Tuple subRangeToRead = mapSubRangeToRegion(rangeToRead, region); + final ByteRange subRangeToRead = mapSubRangeToRegion(rangeToRead, region); final CacheFileRegion fileRegion = get(cacheKey, length, region); final StepListener lis = fileRegion.readIfAvailableOrPending( subRangeToRead, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 7b20afadfffd1..32f39397ba6a6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -40,7 +40,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -67,7 +66,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -132,7 +130,7 @@ private CacheIndexWriter getWriter(CacheFile cacheFile) { } } - public void addCacheFile(CacheFile cacheFile, SortedSet> ranges) throws IOException { + public void addCacheFile(CacheFile cacheFile, SortedSet ranges) throws IOException { ensureStarted(); getWriter(cacheFile).updateCacheFile(cacheFile, ranges); } @@ -179,7 +177,7 @@ long getCacheSize(ShardId shardId, SnapshotId snapshotId, Predicate predic final Document document = leafReaderContext.reader().document(docIdSetIterator.docID()); final String cacheFileId = getValue(document, CACHE_ID_FIELD); if (predicate.test(snapshotCacheDir.resolve(cacheFileId))) { - long size = buildCacheFileRanges(document).stream().mapToLong(range -> range.v2() - range.v1()).sum(); + long size = buildCacheFileRanges(document).stream().mapToLong(ByteRange::length).sum(); logger.trace("cache file [{}] has size [{}]", cacheFileId, size); aggregateSize += size; } @@ -240,7 +238,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { final CacheKey cacheKey = buildCacheKey(cacheDocument); final long fileLength = getFileLength(cacheDocument); - final SortedSet> ranges = buildCacheFileRanges(cacheDocument); + final SortedSet ranges = buildCacheFileRanges(cacheDocument); logger.trace("adding cache file with [id={}, key={}, ranges={}]", id, cacheKey, ranges); cacheService.put(cacheKey, fileLength, file.getParent(), id, ranges); @@ -494,7 +492,7 @@ NodeEnvironment.NodePath nodePath() { return nodePath; } - void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { + void updateCacheFile(CacheFile cacheFile, SortedSet cacheRanges) throws IOException { updateCacheFile(buildId(cacheFile), buildDocument(nodePath, cacheFile, cacheRanges)); } @@ -560,7 +558,7 @@ private static Term buildTerm(String cacheFileUuid) { return new Term(CACHE_ID_FIELD, cacheFileUuid); } - private static Document buildDocument(NodeEnvironment.NodePath nodePath, CacheFile cacheFile, SortedSet> cacheRanges) + private static Document buildDocument(NodeEnvironment.NodePath nodePath, CacheFile cacheFile, SortedSet cacheRanges) throws IOException { final Document document = new Document(); document.add(new StringField(CACHE_ID_FIELD, buildId(cacheFile), Field.Store.YES)); @@ -568,9 +566,9 @@ private static Document buildDocument(NodeEnvironment.NodePath nodePath, CacheFi try (BytesStreamOutput output = new BytesStreamOutput()) { output.writeVInt(cacheRanges.size()); - for (Tuple cacheRange : cacheRanges) { - output.writeVLong(cacheRange.v1()); - output.writeVLong(cacheRange.v2()); + for (ByteRange cacheRange : cacheRanges) { + output.writeVLong(cacheRange.start()); + output.writeVLong(cacheRange.end()); } output.flush(); document.add(new StoredField(CACHE_RANGES_FIELD, output.bytes().toBytesRef())); @@ -614,20 +612,20 @@ private static long getFileLength(Document document) { return Long.parseLong(fileLength); } - private static SortedSet> buildCacheFileRanges(Document document) throws IOException { + private static SortedSet buildCacheFileRanges(Document document) throws IOException { final BytesRef cacheRangesBytesRef = document.getBinaryValue(CACHE_RANGES_FIELD); assert cacheRangesBytesRef != null; - final SortedSet> cacheRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + final SortedSet cacheRanges = new TreeSet<>(); try (StreamInput input = new ByteBufferStreamInput(ByteBuffer.wrap(cacheRangesBytesRef.bytes))) { final int length = input.readVInt(); assert length > 0 : "empty cache ranges"; - Tuple previous = null; + ByteRange previous = null; for (int i = 0; i < length; i++) { - final Tuple range = Tuple.tuple(input.readVLong(), input.readVLong()); - assert range.v1() < range.v2() : range; - assert range.v2() <= getFileLength(document); - assert previous == null || previous.v2() < range.v1(); + final ByteRange range = ByteRange.of(input.readVLong(), input.readVLong()); + assert range.length() > 0 : range; + assert range.end() <= getFileLength(document); + assert previous == null || previous.end() < range.start(); final boolean added = cacheRanges.add(range); assert added : range + " already exist in " + cacheRanges; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 62e42869da007..b473e90ab68ae 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -10,7 +10,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.index.shard.ShardId; @@ -18,6 +17,7 @@ import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.hamcrest.Matcher; import java.io.IOException; @@ -194,8 +194,8 @@ public void testConcurrentAccess() throws Exception { final Future readIfAvailableFuture; if (randomBoolean()) { populateAndReadFuture = cacheFile.populateAndRead( - Tuple.tuple(0L, length), - Tuple.tuple(0L, length), + ByteRange.of(0L, length), + ByteRange.of(0L, length), channel -> Math.toIntExact(length), (channel, from, to, progressUpdater) -> progressUpdater.accept(length), threadPool.generic() @@ -204,7 +204,7 @@ public void testConcurrentAccess() throws Exception { populateAndReadFuture = null; } if (randomBoolean()) { - readIfAvailableFuture = cacheFile.readIfAvailableOrPending(Tuple.tuple(0L, length), channel -> Math.toIntExact(length)); + readIfAvailableFuture = cacheFile.readIfAvailableOrPending(ByteRange.of(0L, length), channel -> Math.toIntExact(length)); } else { readIfAvailableFuture = null; } @@ -247,14 +247,14 @@ public void testFSync() throws Exception { try { if (randomBoolean()) { - final SortedSet> completedRanges = cacheFile.fsync(); + final SortedSet completedRanges = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0)); assertThat(completedRanges, hasSize(0)); assertFalse(cacheFile.needsFsync()); assertFalse(updatesListener.containsUpdate(cacheFile)); } - final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); + final SortedSet expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { assertTrue(cacheFile.needsFsync()); assertTrue(updatesListener.containsUpdate(cacheFile)); @@ -264,9 +264,9 @@ public void testFSync() throws Exception { assertFalse(updatesListener.containsUpdate(cacheFile)); } - final SortedSet> completedRanges = cacheFile.fsync(); + final SortedSet completedRanges = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1)); - assertArrayEquals(completedRanges.toArray(new Tuple[0]), expectedCompletedRanges.toArray(new Tuple[0])); + assertArrayEquals(completedRanges.toArray(new ByteRange[0]), expectedCompletedRanges.toArray(new ByteRange[0])); assertFalse(cacheFile.needsFsync()); assertFalse(updatesListener.containsUpdate(cacheFile)); } finally { @@ -291,14 +291,14 @@ public void testFSyncOnEvictedFile() throws Exception { boolean released = false; try { - final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); + final SortedSet expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { assertTrue(cacheFile.needsFsync()); assertTrue(updatesListener.containsUpdate(cacheFile)); updatesListener.reset(); - final SortedSet> completedRanges = cacheFile.fsync(); - assertArrayEquals(completedRanges.toArray(new Tuple[0]), expectedCompletedRanges.toArray(new Tuple[0])); + final SortedSet completedRanges = cacheFile.fsync(); + assertArrayEquals(completedRanges.toArray(new ByteRange[0]), expectedCompletedRanges.toArray(new ByteRange[0])); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1)); } assertFalse(cacheFile.needsFsync()); @@ -318,7 +318,7 @@ public void testFSyncOnEvictedFile() throws Exception { } updatesListener.reset(); - final SortedSet> completedRangesAfterEviction = cacheFile.fsync(); + final SortedSet completedRangesAfterEviction = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1)); assertThat(completedRangesAfterEviction, hasSize(0)); assertFalse(cacheFile.needsFsync()); @@ -350,7 +350,7 @@ public void testFSyncFailure() throws Exception { cacheFile.acquire(listener); try { - final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); + final SortedSet expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { assertTrue(cacheFile.needsFsync()); assertTrue(updatesListener.containsUpdate(cacheFile)); @@ -363,15 +363,15 @@ public void testFSyncFailure() throws Exception { updatesListener.reset(); } else { assertFalse(cacheFile.needsFsync()); - final SortedSet> completedRanges = cacheFile.fsync(); + final SortedSet completedRanges = cacheFile.fsync(); assertTrue(completedRanges.isEmpty()); } assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0)); fileSystem.failFSyncs(false); - final SortedSet> completedRanges = cacheFile.fsync(); - assertArrayEquals(completedRanges.toArray(new Tuple[0]), expectedCompletedRanges.toArray(new Tuple[0])); + final SortedSet completedRanges = cacheFile.fsync(); + assertArrayEquals(completedRanges.toArray(new ByteRange[0]), expectedCompletedRanges.toArray(new ByteRange[0])); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1)); assertFalse(cacheFile.needsFsync()); assertFalse(updatesListener.containsUpdate(cacheFile)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index 291f38826a740..1bcb28a1ced71 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -9,12 +9,11 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -56,17 +55,15 @@ public void testInvalidRange() { final AtomicBoolean invoked = new AtomicBoolean(false); final ActionListener listener = ActionListener.wrap(() -> invoked.set(true)); - - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> sparseFileTracker.waitForRange(Tuple.tuple(-1L, randomLongBetween(0L, length)), null, listener) - ); - assertThat("start must not be negative", e.getMessage(), containsString("invalid range")); assertThat(invoked.get(), is(false)); - e = expectThrows( + IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> sparseFileTracker.waitForRange(Tuple.tuple(randomLongBetween(0L, Math.max(0L, length - 1L)), length + 1L), null, listener) + () -> sparseFileTracker.waitForRange( + ByteRange.of(randomLongBetween(0L, Math.max(0L, length - 1L)), length + 1L), + null, + listener + ) ); assertThat("end must not be greater than length", e.getMessage(), containsString("invalid range")); assertThat(invoked.get(), is(false)); @@ -74,8 +71,8 @@ public void testInvalidRange() { if (length > 1L) { e = expectThrows(IllegalArgumentException.class, () -> { long start = randomLongBetween(1L, Math.max(1L, length - 1L)); - long end = randomLongBetween(0L, start - 1L); - sparseFileTracker.waitForRange(Tuple.tuple(start, end), null, listener); + long end = randomLongBetween(length, length + 1000L); + sparseFileTracker.waitForRange(ByteRange.of(start, end), null, listener); }); assertThat("end must not be greater than length", e.getMessage(), containsString("invalid range")); assertThat(invoked.get(), is(false)); @@ -86,7 +83,7 @@ public void testInvalidRange() { if (start > 0L) { e = expectThrows( IllegalArgumentException.class, - () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start - 1L, end), listener) + () -> sparseFileTracker.waitForRange(ByteRange.of(start, end), ByteRange.of(start - 1L, end), listener) ); assertThat( "listener range start must not be smaller than range start", @@ -97,7 +94,7 @@ public void testInvalidRange() { } else { e = expectThrows( IllegalArgumentException.class, - () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start - 1L, end), listener) + () -> sparseFileTracker.waitForRange(ByteRange.of(start, end), ByteRange.of(start - 1L, end), listener) ); assertThat( "listener range start must not be smaller than zero", @@ -110,7 +107,7 @@ public void testInvalidRange() { if (end < length) { e = expectThrows( IllegalArgumentException.class, - () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start, end + 1L), listener) + () -> sparseFileTracker.waitForRange(ByteRange.of(start, end), ByteRange.of(start, end + 1L), listener) ); assertThat( "listener range end must not be greater than range end", @@ -121,7 +118,7 @@ public void testInvalidRange() { } else { e = expectThrows( IllegalArgumentException.class, - () -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start, end + 1L), listener) + () -> sparseFileTracker.waitForRange(ByteRange.of(start, end), ByteRange.of(start, end + 1L), listener) ); assertThat( "listener range end must not be greater than length", @@ -153,7 +150,7 @@ public void testCallsListenerWhenWholeRangeIsAvailable() { } } - final Tuple range = Tuple.tuple(start, end); + final ByteRange range = ByteRange.of(start, end); if (pending) { final AtomicBoolean expectNotification = new AtomicBoolean(); final AtomicBoolean wasNotified = new AtomicBoolean(); @@ -204,25 +201,24 @@ public void testCallsListenerWhenRangeIsAvailable() { assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get)); } - final Tuple range; + final ByteRange range; { final long start = randomLongBetween(0L, Math.max(0L, fileContents.length - 1)); - range = Tuple.tuple(start, randomLongBetween(start, fileContents.length)); + range = ByteRange.of(start, randomLongBetween(start, fileContents.length)); } - final Tuple subRange; + final ByteRange subRange; { - final long rangeLength = range.v2() - range.v1(); - if (rangeLength > 1L) { - final long start = randomLongBetween(range.v1(), range.v2() - 1L); - subRange = Tuple.tuple(start, randomLongBetween(start + 1L, range.v2())); + if (range.length() > 1L) { + final long start = randomLongBetween(range.start(), range.end() - 1L); + subRange = ByteRange.of(start, randomLongBetween(start + 1L, range.end())); } else { - subRange = Tuple.tuple(range.v1(), range.v2()); + subRange = ByteRange.of(range.start(), range.end()); } } boolean pending = false; - for (long i = subRange.v1(); i < subRange.v2(); i++) { + for (long i = subRange.start(); i < subRange.end(); i++) { if (fileContents[toIntBytes(i)] == UNAVAILABLE) { pending = true; } @@ -246,8 +242,8 @@ public void testCallsListenerWhenRangeIsAvailable() { assertTrue(wasNotified.get()); for (final SparseFileTracker.Gap gap : gaps) { - assertThat(gap.start(), greaterThanOrEqualTo(range.v1())); - assertThat(gap.end(), lessThanOrEqualTo(range.v2())); + assertThat(gap.start(), greaterThanOrEqualTo(range.start())); + assertThat(gap.end(), lessThanOrEqualTo(range.end())); for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[toIntBytes(i)], equalTo(UNAVAILABLE)); @@ -278,7 +274,7 @@ public void testCallsListenerWhenRangeIsAvailable() { assertFalse(waitIfPendingWasNotified.get()); long triggeringProgress = -1L; - for (long i = subRange.v1(); i < subRange.v2(); i++) { + for (long i = subRange.start(); i < subRange.end(); i++) { if (fileContents[toIntBytes(i)] == UNAVAILABLE) { triggeringProgress = i; } @@ -286,8 +282,8 @@ public void testCallsListenerWhenRangeIsAvailable() { assertThat(triggeringProgress, greaterThanOrEqualTo(0L)); for (final SparseFileTracker.Gap gap : gaps) { - assertThat(gap.start(), greaterThanOrEqualTo(range.v1())); - assertThat(gap.end(), lessThanOrEqualTo(range.v2())); + assertThat(gap.start(), greaterThanOrEqualTo(range.start())); + assertThat(gap.end(), lessThanOrEqualTo(range.end())); for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[toIntBytes(i)], equalTo(UNAVAILABLE)); @@ -419,16 +415,13 @@ public void testThreadSafety() throws InterruptedException { public void testSparseFileTrackerCreatedWithCompletedRanges() { final long fileLength = between(0, 1000); - final SortedSet> completedRanges = randomRanges(fileLength); + final SortedSet completedRanges = randomRanges(fileLength); final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileLength, completedRanges); - assertThat( - sparseFileTracker.getCompletedRanges().toArray(new Tuple[0]), - equalTo(completedRanges.toArray(new Tuple[0])) - ); + assertThat(sparseFileTracker.getCompletedRanges().toArray(new ByteRange[0]), equalTo(completedRanges.toArray(new ByteRange[0]))); - for (Tuple completedRange : completedRanges) { - assertThat(sparseFileTracker.getAbsentRangeWithin(completedRange.v1(), completedRange.v2()), nullValue()); + for (ByteRange completedRange : completedRanges) { + assertThat(sparseFileTracker.getAbsentRangeWithin(completedRange), nullValue()); final AtomicBoolean listenerCalled = new AtomicBoolean(); assertThat(sparseFileTracker.waitForRange(completedRange, completedRange, new ActionListener() { @@ -451,13 +444,11 @@ public void testGetCompletedRanges() { final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length); final Set listenersCalled = new HashSet<>(); - final SortedSet> gapsProcessed = Collections.synchronizedNavigableSet( - new TreeSet<>(Comparator.comparingLong(Tuple::v1)) - ); + final SortedSet gapsProcessed = Collections.synchronizedNavigableSet(new TreeSet<>()); for (int i = between(0, 10); i > 0; i--) { waitForRandomRange(fileContents, sparseFileTracker, listenersCalled::add, gap -> { if (processGap(fileContents, gap)) { - gapsProcessed.add(Tuple.tuple(gap.start(), gap.end())); + gapsProcessed.add(ByteRange.of(gap.start(), gap.end())); } }); assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get)); @@ -465,36 +456,36 @@ public void testGetCompletedRanges() { // merge adjacent processed ranges as the SparseFileTracker does internally when a gap is completed // in order to check that SparseFileTracker.getCompletedRanges() returns the expected values - final SortedSet> expectedCompletedRanges = mergeContiguousRanges(gapsProcessed); + final SortedSet expectedCompletedRanges = mergeContiguousRanges(gapsProcessed); - final SortedSet> completedRanges = sparseFileTracker.getCompletedRanges(); + final SortedSet completedRanges = sparseFileTracker.getCompletedRanges(); assertThat(completedRanges, hasSize(expectedCompletedRanges.size())); assertThat(completedRanges, equalTo(expectedCompletedRanges)); } private static void checkRandomAbsentRange(byte[] fileContents, SparseFileTracker sparseFileTracker, boolean expectExact) { final long checkStart = randomLongBetween(0, fileContents.length - 1); - final long checkEnd = randomLongBetween(0, fileContents.length); + final long checkEnd = randomLongBetween(checkStart, fileContents.length); - final Tuple freeRange = sparseFileTracker.getAbsentRangeWithin(checkStart, checkEnd); + final ByteRange freeRange = sparseFileTracker.getAbsentRangeWithin(ByteRange.of(checkStart, checkEnd)); if (freeRange == null) { for (long i = checkStart; i < checkEnd; i++) { assertThat(fileContents[toIntBytes(i)], equalTo(AVAILABLE)); } } else { - assertThat(freeRange.v1(), greaterThanOrEqualTo(checkStart)); - assertTrue(freeRange.toString(), freeRange.v1() < freeRange.v2()); - assertThat(freeRange.v2(), lessThanOrEqualTo(checkEnd)); - for (long i = checkStart; i < freeRange.v1(); i++) { + assertThat(freeRange.start(), greaterThanOrEqualTo(checkStart)); + assertTrue(freeRange.toString(), freeRange.start() < freeRange.end()); + assertThat(freeRange.end(), lessThanOrEqualTo(checkEnd)); + for (long i = checkStart; i < freeRange.start(); i++) { assertThat(fileContents[toIntBytes(i)], equalTo(AVAILABLE)); } - for (long i = freeRange.v2(); i < checkEnd; i++) { + for (long i = freeRange.end(); i < checkEnd; i++) { assertThat(fileContents[toIntBytes(i)], equalTo(AVAILABLE)); } if (expectExact) { // without concurrent activity, the returned range is as small as possible - assertThat(fileContents[toIntBytes(freeRange.v1())], equalTo(UNAVAILABLE)); - assertThat(fileContents[toIntBytes(freeRange.v2() - 1)], equalTo(UNAVAILABLE)); + assertThat(fileContents[toIntBytes(freeRange.start())], equalTo(UNAVAILABLE)); + assertThat(fileContents[toIntBytes(freeRange.end() - 1)], equalTo(UNAVAILABLE)); } } } @@ -532,8 +523,8 @@ public void onFailure(Exception e) { if (randomBoolean()) { final List gaps = sparseFileTracker.waitForRange( - Tuple.tuple(rangeStart, rangeEnd), - Tuple.tuple(subRangeStart, subRangeEnd), + ByteRange.of(rangeStart, rangeEnd), + ByteRange.of(subRangeStart, subRangeEnd), actionListener ); @@ -544,7 +535,7 @@ public void onFailure(Exception e) { gapConsumer.accept(gap); } } else { - final boolean listenerRegistered = sparseFileTracker.waitForRangeIfPending(Tuple.tuple(rangeStart, rangeEnd), actionListener); + final boolean listenerRegistered = sparseFileTracker.waitForRangeIfPending(ByteRange.of(rangeStart, rangeEnd), actionListener); if (listenerRegistered == false) { assertTrue(listenerCalled.compareAndSet(false, true)); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index fd0aa7f89f3e6..5fd47897eb9fc 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -23,11 +23,11 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; @@ -41,7 +41,6 @@ import java.nio.file.spi.FileSystemProvider; import java.time.Instant; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,12 +67,12 @@ public final class TestUtils { private TestUtils() {} - public static SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { + public static SortedSet randomPopulateAndReads(final CacheFile cacheFile) { return randomPopulateAndReads(cacheFile, (fileChannel, aLong, aLong2) -> {}); } - public static SortedSet> randomPopulateAndReads(CacheFile cacheFile, TriConsumer consumer) { - final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); + public static SortedSet randomPopulateAndReads(CacheFile cacheFile, TriConsumer consumer) { + final SortedSet ranges = synchronizedNavigableSet(new TreeSet<>()); final List> futures = new ArrayList<>(); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( builder().put(NODE_NAME_SETTING.getKey(), "_node").build(), @@ -82,11 +81,11 @@ public static SortedSet> randomPopulateAndReads(CacheFile cach for (int i = 0; i < between(0, 10); i++) { final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); final long end = randomLongBetween(Math.min(start + 1L, cacheFile.getLength()), cacheFile.getLength()); - final Tuple range = Tuple.tuple(start, end); + final ByteRange range = ByteRange.of(start, end); futures.add( cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { consumer.apply(channel, from, to); - ranges.add(Tuple.tuple(from, to)); + ranges.add(ByteRange.of(from, to)); progressUpdater.accept(to); }, deterministicTaskQueue.getThreadPool().generic()) ); @@ -114,26 +113,25 @@ static long numberOfRanges(int fileSize, int rangeSize) { /** * Generates a sorted set of non-empty and non-contiguous random ranges that could fit into a file of a given maximum length. */ - public static SortedSet> randomRanges(long length) { - final SortedSet> randomRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + public static SortedSet randomRanges(long length) { + final SortedSet randomRanges = new TreeSet<>(); for (long i = 0L; i < length;) { long start = randomLongBetween(i, Math.max(0L, length - 1L)); long end = randomLongBetween(start + 1L, length); // +1 for non empty ranges - randomRanges.add(Tuple.tuple(start, end)); + randomRanges.add(ByteRange.of(start, end)); i = end + 1L + randomLongBetween(0L, Math.max(0L, length - end)); // +1 for non contiguous ranges } return randomRanges; } - public static SortedSet> mergeContiguousRanges(final SortedSet> ranges) { - // Eclipse needs the TreeSet type to be explicit (see https://bugs.eclipse.org/bugs/show_bug.cgi?id=568600) - return ranges.stream().collect(() -> new TreeSet>(Comparator.comparingLong(Tuple::v1)), (gaps, gap) -> { + public static SortedSet mergeContiguousRanges(final SortedSet ranges) { + return ranges.stream().collect(TreeSet::new, (gaps, gap) -> { if (gaps.isEmpty()) { gaps.add(gap); } else { - final Tuple previous = gaps.pollLast(); - if (previous.v2().equals(gap.v1())) { - gaps.add(Tuple.tuple(previous.v1(), gap.v2())); + final ByteRange previous = gaps.pollLast(); + if (previous.end() == gap.start()) { + gaps.add(ByteRange.of(previous.start(), gap.end())); } else { gaps.add(previous); gaps.add(gap); @@ -141,10 +139,10 @@ public static SortedSet> mergeContiguousRanges(final SortedSet } }, (gaps1, gaps2) -> { if (gaps1.isEmpty() == false && gaps2.isEmpty() == false) { - final Tuple last = gaps1.pollLast(); - final Tuple first = gaps2.pollFirst(); - if (last.v2().equals(first.v1())) { - gaps1.add(Tuple.tuple(last.v1(), first.v2())); + final ByteRange last = gaps1.pollLast(); + final ByteRange first = gaps2.pollFirst(); + if (last.end() == first.start()) { + gaps1.add(ByteRange.of(last.start(), first.end())); } else { gaps1.add(last); gaps2.add(first); @@ -181,7 +179,7 @@ public static void assertCounter( } public static long sumOfCompletedRangesLengths(CacheFile cacheFile) { - return cacheFile.getCompletedRanges().stream().mapToLong(range -> range.v2() - range.v1()).sum(); + return cacheFile.getCompletedRanges().stream().mapToLong(ByteRange::length).sum(); } /** diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 039a2bbd2e99d..152c7a20868de 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Set; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; import org.elasticsearch.common.settings.ClusterSettings; @@ -40,6 +39,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; @@ -284,7 +284,7 @@ protected List randomCacheFiles(CacheService cacheService) throws Exc final CacheFile.EvictionListener listener = evictedCacheFile -> {}; cacheFile.acquire(listener); try { - SortedSet> ranges = Collections.emptySortedSet(); + SortedSet ranges = Collections.emptySortedSet(); while (ranges.isEmpty()) { ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { try { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index c15c5c885e384..e2fed30beee35 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -104,7 +104,7 @@ public void testCacheSynchronization() throws Exception { final CacheFile.EvictionListener listener = evictedCacheFile -> {}; cacheFile.acquire(listener); - final SortedSet> newCacheRanges = randomPopulateAndReads(cacheFile); + final SortedSet newCacheRanges = randomPopulateAndReads(cacheFile); assertThat(cacheService.isCacheFileToSync(cacheFile), is(newCacheRanges.isEmpty() == false)); if (newCacheRanges.isEmpty() == false) { final int numberOfWrites = cacheEntry.getValue().v2() + 1; @@ -123,7 +123,7 @@ public void testCacheSynchronization() throws Exception { final CacheFile.EvictionListener listener = evictedCacheFile -> {}; cacheFile.acquire(listener); - final SortedSet> newRanges = randomPopulateAndReads(cacheFile); + final SortedSet newRanges = randomPopulateAndReads(cacheFile); assertThat(cacheService.isCacheFileToSync(cacheFile), is(newRanges.isEmpty() == false)); updates.put(cacheKey, Tuple.tuple(cacheFile, newRanges.isEmpty() ? 0 : 1)); cacheFile.release(listener); @@ -198,7 +198,7 @@ public void testPut() throws Exception { resolveSnapshotCache(randomShardPath(cacheKey.getShardId())).resolve(cacheKey.getSnapshotUUID()) ); final String cacheFileUuid = UUIDs.randomBase64UUID(random()); - final SortedSet> cacheFileRanges = randomBoolean() ? randomRanges(fileLength) : emptySortedSet(); + final SortedSet cacheFileRanges = randomBoolean() ? randomRanges(fileLength) : emptySortedSet(); if (randomBoolean()) { final Path cacheFilePath = cacheDir.resolve(cacheFileUuid); @@ -213,8 +213,8 @@ public void testPut() throws Exception { assertThat(cacheFile.getCacheKey(), equalTo(cacheKey)); assertThat(cacheFile.getLength(), equalTo(fileLength)); - for (Tuple cacheFileRange : cacheFileRanges) { - assertThat(cacheFile.getAbsentRangeWithin(cacheFileRange.v1(), cacheFileRange.v2()), nullValue()); + for (ByteRange cacheFileRange : cacheFileRanges) { + assertThat(cacheFile.getAbsentRangeWithin(cacheFileRange), nullValue()); } } else { final FileNotFoundException exception = expectThrows( diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheServiceTests.java index a03c3efdabd33..769b924a24238 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheServiceTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -56,8 +55,8 @@ public void testBasicEviction() throws IOException { assertFalse(region1.tryEvict()); assertEquals(3, cacheService.freeRegionCount()); region0.populateAndRead( - Tuple.tuple(0L, 1L), - Tuple.tuple(0L, 1L), + ByteRange.of(0L, 1L), + ByteRange.of(0L, 1L), (channel, channelPos, relativePos, length) -> 1, (channel, channelPos, relativePos, length, progressUpdater) -> progressUpdater.accept(length), taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index 86b750ca474c8..d75f727988f55 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -13,7 +13,6 @@ import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.mockfile.FilterFileSystemProvider; import org.apache.lucene.mockfile.FilterPath; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.settings.Settings; @@ -225,7 +224,7 @@ public void testFSyncDoesNotAddDocumentsBackInPersistentCacheWhenShardIsEvicted( final SnapshotId snapshotId = new SnapshotId("_ignored_", cacheKey.getSnapshotUUID()); if (randomBoolean()) { - final Tuple absent = randomCacheFile.getAbsentRangeWithin(0L, randomCacheFile.getLength()); + final ByteRange absent = randomCacheFile.getAbsentRangeWithin(ByteRange.of(0L, randomCacheFile.getLength())); if (absent != null) { assertThat( "Persistent cache should not contain any cached data", @@ -249,7 +248,7 @@ public void testFSyncDoesNotAddDocumentsBackInPersistentCacheWhenShardIsEvicted( final CacheFile.EvictionListener listener = evictedCacheFile -> {}; randomCacheFile.acquire(listener); try { - SortedSet> ranges = null; + SortedSet ranges = null; while (ranges == null || ranges.isEmpty()) { ranges = randomPopulateAndReads(randomCacheFile); }