Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ public long getLength() {
return length;
}

public List<Tuple<Long, Long>> getCompletedRanges() {
List<Tuple<Long, Long>> completedRanges = null;
synchronized (mutex) {
assert invariant();
for (Range range : ranges) {
if (range.isPending()) {
continue;
}
if (completedRanges == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random NIT/Question: why use a LinkedList for this, seems if we're actually worried about this running so often that it'll be tricky on the GC ArrayList would perform more predictably and is what we generally use for this type of spot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that's what we usually use, I changed that.

completedRanges = new ArrayList<>();
}
completedRanges.add(Tuple.tuple(range.start, range.end));
}
}
return completedRanges == null ? Collections.emptyList() : completedRanges;
}

/**
* @return the sum of the length of the ranges
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -26,6 +30,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -408,6 +413,58 @@ public void testThreadSafety() throws InterruptedException {
checkThread.join();
}

public void testCompletedRanges() {
final byte[] fileContents = new byte[between(0, 1000)];
final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length);

final Set<AtomicBoolean> listenersCalled = new HashSet<>();
final Set<SparseFileTracker.Gap> gapsProcessed = Collections.synchronizedSet(
new TreeSet<>(Comparator.comparingLong(SparseFileTracker.Gap::start))
);
for (int i = between(0, 10); i > 0; i--) {
waitForRandomRange(fileContents, sparseFileTracker, listenersCalled::add, gap -> {
if (processGap(fileContents, gap)) {
gapsProcessed.add(gap);
}
});
assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get));
}

// merge adjacent processed ranges as the SparseFileTracker does internally when a gap is completed
// in order to check that SparseFileTracker.getCompletedRanges() returns the expected values
final List<Tuple<Long, Long>> expectedCompletedRanges = gapsProcessed.stream()
.map(gap -> Tuple.tuple(gap.start(), gap.end()))
.collect(LinkedList::new, (gaps, gap) -> {
if (gaps.isEmpty()) {
gaps.add(gap);
} else {
final Tuple<Long, Long> previous = gaps.removeLast();
if (previous.v2().equals(gap.v1())) {
gaps.add(Tuple.tuple(previous.v1(), gap.v2()));
} else {
gaps.add(previous);
gaps.add(gap);
}
}
}, (gaps1, gaps2) -> {
if (gaps1.isEmpty() == false && gaps2.isEmpty() == false) {
final Tuple<Long, Long> last = gaps1.removeLast();
final Tuple<Long, Long> first = gaps2.removeFirst();
if (last.v2().equals(first.v1())) {
gaps1.add(Tuple.tuple(last.v1(), first.v2()));
} else {
gaps1.add(last);
gaps2.add(first);
}
}
gaps1.addAll(gaps2);
});

final List<Tuple<Long, Long>> completedRanges = sparseFileTracker.getCompletedRanges();
assertThat(completedRanges, hasSize(expectedCompletedRanges.size()));
assertThat(completedRanges, equalTo(expectedCompletedRanges));
}

private static void checkRandomAbsentRange(byte[] fileContents, SparseFileTracker sparseFileTracker, boolean expectExact) {
final long checkStart = randomLongBetween(0, fileContents.length - 1);
final long checkEnd = randomLongBetween(0, fileContents.length);
Expand Down Expand Up @@ -487,19 +544,21 @@ public void onFailure(Exception e) {
}
}

private static void processGap(byte[] fileContents, SparseFileTracker.Gap gap) {
private static boolean processGap(byte[] fileContents, SparseFileTracker.Gap gap) {
for (long i = gap.start(); i < gap.end(); i++) {
assertThat(fileContents[toIntBytes(i)], equalTo(UNAVAILABLE));
}

if (randomBoolean()) {
gap.onFailure(new ElasticsearchException("simulated"));
return false;
} else {
for (long i = gap.start(); i < gap.end(); i++) {
fileContents[toIntBytes(i)] = AVAILABLE;
gap.onProgress(i + 1L);
}
gap.onCompletion();
return true;
}
}
}