Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -138,7 +138,7 @@ public CacheFile(CacheKey cacheKey, long length, Path file, ModificationListener
this(cacheKey, new SparseFileTracker(file.toString(), length), file, listener);
}

public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet<Tuple<Long, Long>> ranges, ModificationListener listener) {
public CacheFile(CacheKey cacheKey, long length, Path file, SortedSet<ByteRange> ranges, ModificationListener listener) {
this(cacheKey, new SparseFileTracker(file.toString(), length, ranges), file, listener);
}

Expand Down Expand Up @@ -170,7 +170,7 @@ FileChannel getChannel() {
}

// Only used in tests
SortedSet<Tuple<Long, Long>> getCompletedRanges() {
SortedSet<ByteRange> getCompletedRanges() {
return tracker.getCompletedRanges();
}

Expand Down Expand Up @@ -343,8 +343,8 @@ interface RangeMissingHandler {
* @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed.
*/
Future<Integer> populateAndRead(
final Tuple<Long, Long> rangeToWrite,
final Tuple<Long, Long> rangeToRead,
final ByteRange rangeToWrite,
final ByteRange rangeToRead,
final RangeAvailableHandler reader,
final RangeMissingHandler writer,
final Executor executor
Expand Down Expand Up @@ -401,7 +401,7 @@ public void onFailure(Exception e) {
* target range is neither available nor pending.
*/
@Nullable
Future<Integer> readIfAvailableOrPending(final Tuple<Long, Long> rangeToRead, final RangeAvailableHandler reader) {
Future<Integer> readIfAvailableOrPending(final ByteRange rangeToRead, final RangeAvailableHandler reader) {
final PlainActionFuture<Integer> future = PlainActionFuture.newFuture();
Releasable decrementRef = null;
try {
Expand Down Expand Up @@ -429,20 +429,20 @@ private static void releaseAndFail(PlainActionFuture<Integer> future, Releasable
}

private static ActionListener<Void> rangeListener(
Tuple<Long, Long> rangeToRead,
ByteRange rangeToRead,
RangeAvailableHandler reader,
PlainActionFuture<Integer> future,
FileChannelReference reference,
Releasable releasable
) {
return ActionListener.runAfter(ActionListener.wrap(success -> {
final int read = reader.onRangeAvailable(reference.fileChannel);
assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read ["
assert read == rangeToRead.length() : "partial read ["
+ read
+ "] does not match the range to read ["
+ rangeToRead.v2()
+ rangeToRead.end()
+ '-'
+ rangeToRead.v1()
+ rangeToRead.start()
+ ']';
future.onResponse(read);
}, future::onFailure), releasable::close);
Expand All @@ -466,9 +466,9 @@ private FileChannelReference acquireFileChannelReference() {
return reference;
}

public Tuple<Long, Long> getAbsentRangeWithin(long start, long end) {
public ByteRange getAbsentRangeWithin(ByteRange range) {
ensureOpen();
return tracker.getAbsentRangeWithin(start, end);
return tracker.getAbsentRangeWithin(range);
}

// used in tests
Expand Down Expand Up @@ -497,7 +497,7 @@ private void markAsNeedsFSync() {
* @throws IOException if the cache file failed to be fsync
* @throws java.nio.file.NoSuchFileException if the cache file does not exist
*/
public SortedSet<Tuple<Long, Long>> fsync() throws IOException {
public SortedSet<ByteRange> fsync() throws IOException {
if (refCounter.tryIncRef()) {
try {
if (needsFsync.compareAndSet(true, false)) {
Expand All @@ -506,7 +506,7 @@ public SortedSet<Tuple<Long, Long>> fsync() throws IOException {
// Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as
// persisted on disk by the caller of this method, even if they are fully written to disk at the time the file
// fsync is effectively executed
final SortedSet<Tuple<Long, Long>> completedRanges = tracker.getCompletedRanges();
final SortedSet<ByteRange> completedRanges = tracker.getCompletedRanges();
assert completedRanges != null;
assert completedRanges.isEmpty() == false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.store.IndexInputStats;
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -134,11 +135,11 @@ private long getDefaultRangeSize() {
: fileInfo.partSize().getBytes();
}

private Tuple<Long, Long> computeRange(long position) {
private ByteRange computeRange(long position) {
final long rangeSize = getDefaultRangeSize();
long start = (position / rangeSize) * rangeSize;
long end = Math.min(start + rangeSize, fileInfo.length());
return Tuple.tuple(start, end);
return ByteRange.of(start, end);
}

@Override
Expand All @@ -165,8 +166,8 @@ protected void readInternal(ByteBuffer b) throws IOException {

// Can we serve the read directly from disk? If so, do so and don't worry about anything else.

final Future<Integer> waitingForRead = cacheFile.readIfAvailableOrPending(Tuple.tuple(position, position + length), channel -> {
final int read = readCacheFile(channel, position, b);
final Future<Integer> waitingForRead = cacheFile.readIfAvailableOrPending(ByteRange.of(position, position + length), chan -> {
final int read = readCacheFile(chan, position, b);
assert read == length : read + " vs " + length;
return read;
});
Expand All @@ -180,7 +181,7 @@ protected void readInternal(ByteBuffer b) throws IOException {

// Requested data is not on disk, so try the cache index next.

final Tuple<Long, Long> indexCacheMiss; // null if not a miss
final ByteRange indexCacheMiss; // null if not a miss

// We try to use the cache index if:
// - the file is small enough to be fully cached
Expand All @@ -198,10 +199,10 @@ protected void readInternal(ByteBuffer b) throws IOException {

if (canBeFullyCached) {
// if the index input is smaller than twice the size of the blob cache, it will be fully indexed
indexCacheMiss = Tuple.tuple(0L, fileInfo.length());
indexCacheMiss = ByteRange.of(0L, fileInfo.length());
} else {
// the index input is too large to fully cache, so just cache the initial range
indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
indexCacheMiss = ByteRange.of(0L, BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
}

// We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put.
Expand All @@ -223,7 +224,7 @@ protected void readInternal(ByteBuffer b) throws IOException {
assert b.position() == length : "copied " + b.position() + " but expected " + length;

try {
final Tuple<Long, Long> cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to());
final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to());
cacheFile.populateAndRead(
cachedRange,
cachedRange,
Expand Down Expand Up @@ -274,23 +275,14 @@ protected void readInternal(ByteBuffer b) throws IOException {
// Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any
// miss in the cache index.

final Tuple<Long, Long> startRangeToWrite = computeRange(position);
final Tuple<Long, Long> endRangeToWrite = computeRange(position + length - 1);
assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite;
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(
Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()),
Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2())
);

assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "["
+ position
+ "-"
+ (position + length)
+ "] vs "
+ rangeToWrite;
final ByteRange startRangeToWrite = computeRange(position);
final ByteRange endRangeToWrite = computeRange(position + length - 1);
assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite;
final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss);

final Tuple<Long, Long> rangeToRead = Tuple.tuple(position, position + length);
assert rangeToRead.v2() - rangeToRead.v1() == b.remaining() : b.remaining() + " vs " + rangeToRead;
final ByteRange rangeToRead = ByteRange.of(position, position + length);
assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite;
assert rangeToRead.length() == b.remaining() : b.remaining() + " vs " + rangeToRead;

final Future<Integer> populateCacheFuture = cacheFile.populateAndRead(
rangeToWrite,
Expand All @@ -303,7 +295,7 @@ protected void readInternal(ByteBuffer b) throws IOException {
if (indexCacheMiss != null) {
final Releasable onCacheFillComplete = stats.addIndexCacheFill();
final Future<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
final int indexCacheMissLength = toIntBytes(indexCacheMiss.v2() - indexCacheMiss.v1());
final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());

// We assume that we only cache small portions of blobs so that we do not need to:
// - use a BigArrays for allocation
Expand All @@ -312,11 +304,11 @@ protected void readInternal(ByteBuffer b) throws IOException {
assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss;

final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer);
Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer);
// NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
byteBuffer.flip();
final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() {
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener<>() {
@Override
public void onResponse(Void response) {
onCacheFillComplete.close();
Expand Down Expand Up @@ -438,56 +430,52 @@ public Tuple<Long, Long> prefetchPart(final int part) throws IOException {
if (part >= fileInfo.numberOfParts()) {
throw new IllegalArgumentException("Unexpected part number [" + part + "]");
}
final Tuple<Long, Long> partRange = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum());
final ByteRange partRange = computeRange(IntStream.range(0, part).mapToLong(fileInfo::partBytes).sum());
assert assertRangeIsAlignedWithPart(partRange);

try {
final CacheFile cacheFile = cacheFileReference.get();

final Tuple<Long, Long> range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2());
final ByteRange range = cacheFile.getAbsentRangeWithin(partRange);
if (range == null) {
logger.trace(
"prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]",
part,
partRange.v1(),
partRange.v2(),
partRange.start(),
partRange.end(),
cacheFileReference
);
return Tuple.tuple(cacheFile.getInitialLength(), 0L);
}

final long rangeStart = range.v1();
final long rangeEnd = range.v2();
final long rangeLength = rangeEnd - rangeStart;

logger.trace(
"prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]",
part,
partRange.v1(),
partRange.v2(),
rangeStart,
rangeEnd,
partRange.start(),
partRange.end(),
range.start(),
range.end(),
cacheFileReference
);

final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))];
final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, range.length()))];

long totalBytesRead = 0L;
final AtomicLong totalBytesWritten = new AtomicLong();
long remainingBytes = rangeEnd - rangeStart;
long remainingBytes = range.length();
final long startTimeNanos = stats.currentTimeNanos();
try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) {
try (InputStream input = openInputStreamFromBlobStore(range.start(), range.length())) {
while (remainingBytes > 0L) {
assert totalBytesRead + remainingBytes == rangeLength;
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
assert totalBytesRead + remainingBytes == range.length();
final int bytesRead = readSafe(input, copyBuffer, range.start(), range.end(), remainingBytes, cacheFileReference);

// The range to prewarm in cache
final long readStart = rangeStart + totalBytesRead;
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
final long readStart = range.start() + totalBytesRead;
final ByteRange rangeToWrite = ByteRange.of(readStart, readStart + bytesRead);

// We do not actually read anything, but we want to wait for the write to complete before proceeding.
// noinspection UnnecessaryLocalVariable
final Tuple<Long, Long> rangeToRead = rangeToWrite;
final ByteRange rangeToRead = rangeToWrite;
cacheFile.populateAndRead(rangeToWrite, rangeToRead, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> {
final ByteBuffer byteBuffer = ByteBuffer.wrap(copyBuffer, toIntBytes(start - readStart), toIntBytes(end - start));
final int writtenBytes = positionalWrite(channel, start, byteBuffer);
Expand All @@ -507,8 +495,8 @@ public Tuple<Long, Long> prefetchPart(final int part) throws IOException {
final long endTimeNanos = stats.currentTimeNanos();
stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos);
}
assert totalBytesRead == rangeLength;
return Tuple.tuple(cacheFile.getInitialLength(), rangeLength);
assert totalBytesRead == range.length();
return Tuple.tuple(cacheFile.getInitialLength(), range.length());
} catch (final Exception e) {
throw new IOException("Failed to prefetch file part in cache", e);
}
Expand Down Expand Up @@ -555,16 +543,16 @@ private static int readSafe(
/**
* Asserts that the range of bytes to warm in cache is aligned with {@link #fileInfo}'s part size.
*/
private boolean assertRangeIsAlignedWithPart(Tuple<Long, Long> range) {
private boolean assertRangeIsAlignedWithPart(ByteRange range) {
if (fileInfo.numberOfParts() == 1L) {
final long length = fileInfo.length();
assert range.v1() == 0L : "start of range [" + range.v1() + "] is not aligned with zero";
assert range.v2() == length : "end of range [" + range.v2() + "] is not aligned with file length [" + length + ']';
assert range.start() == 0L : "start of range [" + range.start() + "] is not aligned with zero";
assert range.end() == length : "end of range [" + range.end() + "] is not aligned with file length [" + length + ']';
} else {
final long length = fileInfo.partSize().getBytes();
assert range.v1() % length == 0L : "start of range [" + range.v1() + "] is not aligned with part start";
assert range.v2() % length == 0L || (range.v2() == fileInfo.length()) : "end of range ["
+ range.v2()
assert range.start() % length == 0L : "start of range [" + range.start() + "] is not aligned with part start";
assert range.end() % length == 0L || (range.end() == fileInfo.length()) : "end of range ["
+ range.end()
+ "] is not aligned with part end or with file length";
}
return true;
Expand Down
Loading