Skip to content

Commit d6e64ea

Browse files
committed
Dedup translog operations by reading in reverse
Currently, translog operations are read and processed one by one. This may be a problem as stale operations in translogs may suddenly reappear in recoveries. To make sure that stale operations won't be processed, we read the translog files in a reverse order (eg. from the most recent file to the oldest file) and only process an operation if its sequence number was not seen before.
1 parent 749c3ec commit d6e64ea

File tree

6 files changed

+344
-29
lines changed

6 files changed

+344
-29
lines changed

core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
package org.elasticsearch.index.translog;
2121

22+
import com.carrotsearch.hppc.LongHashSet;
23+
import com.carrotsearch.hppc.LongObjectHashMap;
24+
import com.carrotsearch.hppc.LongSet;
25+
import org.apache.lucene.util.FixedBitSet;
26+
2227
import java.io.Closeable;
2328
import java.io.IOException;
2429
import java.util.Arrays;
@@ -30,32 +35,44 @@ final class MultiSnapshot implements Translog.Snapshot {
3035

3136
private final TranslogSnapshot[] translogs;
3237
private final int totalOperations;
38+
private int skippedOperations;
3339
private final Closeable onClose;
3440
private int index;
41+
private final SeqNumSet seenSeqNo;
3542

3643
/**
3744
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
3845
*/
3946
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
4047
this.translogs = translogs;
41-
totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
48+
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
49+
this.skippedOperations = 0;
4250
this.onClose = onClose;
43-
index = 0;
51+
this.seenSeqNo = new SeqNumSet();
52+
this.index = translogs.length - 1;
4453
}
4554

46-
4755
@Override
4856
public int totalOperations() {
4957
return totalOperations;
5058
}
5159

60+
@Override
61+
public int skippedOperations() {
62+
return skippedOperations;
63+
}
64+
5265
@Override
5366
public Translog.Operation next() throws IOException {
54-
for (; index < translogs.length; index++) {
67+
for (; index >= 0; index--) {
5568
final TranslogSnapshot current = translogs[index];
56-
Translog.Operation op = current.next();
57-
if (op != null) { // if we are null we move to the next snapshot
58-
return op;
69+
Translog.Operation op;
70+
while ((op = current.next()) != null) {
71+
if (op.seqNo() < 0 || seenSeqNo.getAndSet(op.seqNo()) == false) {
72+
return op;
73+
}else {
74+
skippedOperations++;
75+
}
5976
}
6077
}
6178
return null;
@@ -65,4 +82,65 @@ public Translog.Operation next() throws IOException {
6582
public void close() throws IOException {
6683
onClose.close();
6784
}
85+
86+
/**
87+
* A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1).
88+
*/
89+
private static final class CountedBitSet {
90+
private short onBits;
91+
private final FixedBitSet bitset;
92+
93+
CountedBitSet(short numBits) {
94+
assert numBits > 0;
95+
this.onBits = 0;
96+
this.bitset = new FixedBitSet(numBits);
97+
}
98+
99+
boolean getAndSet(int index) {
100+
assert index >= 0;
101+
boolean wasOn = bitset.getAndSet(index);
102+
if (wasOn == false) {
103+
onBits++;
104+
}
105+
return wasOn;
106+
}
107+
108+
boolean hasAllBitsOn() {
109+
return onBits == bitset.length();
110+
}
111+
}
112+
113+
/**
114+
* Sequence numbers from translog are likely to form contiguous ranges, thus using two tiers can reduce memory usage.
115+
*/
116+
static final class SeqNumSet {
117+
static final short BIT_SET_SIZE = 1024;
118+
private final LongSet topTier = new LongHashSet();
119+
private final LongObjectHashMap<CountedBitSet> bottomTier = new LongObjectHashMap<>();
120+
121+
/**
122+
* Marks this sequence number and returns <tt>true</tt> if it is seen before.
123+
*/
124+
boolean getAndSet(long value) {
125+
assert value >= 0;
126+
final long key = value / BIT_SET_SIZE;
127+
128+
if (topTier.contains(key)) {
129+
return true;
130+
}
131+
132+
CountedBitSet bitset = bottomTier.get(key);
133+
if (bitset == null) {
134+
bitset = new CountedBitSet(BIT_SET_SIZE);
135+
bottomTier.put(key, bitset);
136+
}
137+
138+
final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE));
139+
if (bitset.hasAllBitsOn()) {
140+
bottomTier.remove(key);
141+
topTier.add(key);
142+
}
143+
return wasOn;
144+
}
145+
}
68146
}

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,10 +832,18 @@ public int hashCode() {
832832
public interface Snapshot extends Closeable {
833833

834834
/**
835-
* The total number of operations in the translog.
835+
* The total estimated number of operations in the snapshot.
836836
*/
837837
int totalOperations();
838838

839+
/**
840+
* The number of operations has been skipped in the snapshot so far.
841+
* Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called.
842+
*/
843+
default int skippedOperations() {
844+
return 0;
845+
}
846+
839847
/**
840848
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
841849
*/

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
562562
}
563563
}
564564

565+
logger.trace("Translog skipped [{}] operations", snapshot.skippedOperations());
566+
skippedOps += snapshot.skippedOperations();
567+
565568
if (!operations.isEmpty() || totalSentOps == 0) {
566569
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
567570
cancellableThreads.executeIO(sendBatch);
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.translog;
21+
22+
import com.carrotsearch.hppc.LongHashSet;
23+
import com.carrotsearch.hppc.LongSet;
24+
import org.elasticsearch.common.Randomness;
25+
import org.elasticsearch.test.ESTestCase;
26+
27+
import java.util.List;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.IntStream;
30+
import java.util.stream.LongStream;
31+
32+
import static org.hamcrest.CoreMatchers.equalTo;
33+
34+
public class MultiSnapshotTests extends ESTestCase {
35+
public void testTrackSeqNumRandomRanges() throws Exception {
36+
final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet();
37+
final LongSet normalSet = new LongHashSet();
38+
IntStream.range(0, between(20_000, 50_000)).forEach(i -> {
39+
long seq = randomNonNegativeLong();
40+
boolean existed = normalSet.add(seq) == false;
41+
assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed));
42+
});
43+
}
44+
45+
public void testTrackSeqNumDenseRanges() throws Exception {
46+
final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet();
47+
final LongSet normalSet = new LongHashSet();
48+
IntStream.range(0, between(20_000, 50_000)).forEach(i -> {
49+
long seq = between(0, 5000);
50+
boolean existed = normalSet.add(seq) == false;
51+
assertThat("SeqNumSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed));
52+
});
53+
}
54+
55+
public void testTrackSeqNumSparseRanges() throws Exception {
56+
final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet();
57+
final LongSet normalSet = new LongHashSet();
58+
IntStream.range(0, between(20_000, 50_000)).forEach(i -> {
59+
long seq = between(i * 10_000, i * 30_000);
60+
boolean existed = normalSet.add(seq) == false;
61+
assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed));
62+
});
63+
}
64+
65+
public void testSequenceNumMimicTranslog() throws Exception {
66+
final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet();
67+
final LongSet normalSet = new LongHashSet();
68+
long currentSeq = between(10_000_000, 1_000_000_000);
69+
final int iterations = between(100, 2000);
70+
for (long i = 0; i < iterations; i++) {
71+
List<Long> batch = LongStream.range(currentSeq, currentSeq + between(1, 1000))
72+
.boxed()
73+
.collect(Collectors.toList());
74+
Randomness.shuffle(batch);
75+
batch.forEach(seq -> {
76+
boolean existed = normalSet.add(seq) == false;
77+
assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed));
78+
});
79+
currentSeq -= batch.size();
80+
}
81+
}
82+
}

core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
import org.elasticsearch.ElasticsearchException;
2323
import org.hamcrest.Description;
2424
import org.hamcrest.Matcher;
25+
import org.hamcrest.TypeSafeDiagnosingMatcher;
2526
import org.hamcrest.TypeSafeMatcher;
2627

2728
import java.io.IOException;
2829
import java.util.ArrayList;
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.stream.Collectors;
2933

3034

3135
public final class SnapshotMatchers {
@@ -50,10 +54,14 @@ public static Matcher<Translog.Snapshot> equalsTo(Translog.Operation... ops) {
5054
/**
5155
* Consumes a snapshot and make sure it's content is as expected
5256
*/
53-
public static Matcher<Translog.Snapshot> equalsTo(ArrayList<Translog.Operation> ops) {
57+
public static Matcher<Translog.Snapshot> equalsTo(List<Translog.Operation> ops) {
5458
return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()]));
5559
}
5660

61+
public static Matcher<Translog.Snapshot> containsOperationsInAnyOrder(Collection<Translog.Operation> expectedOperations) {
62+
return new ContainingInAnyOrderMatcher(expectedOperations);
63+
}
64+
5765
public static class SizeMatcher extends TypeSafeMatcher<Translog.Snapshot> {
5866

5967
private final int size;
@@ -128,4 +136,51 @@ public void describeTo(Description description) {
128136
}
129137

130138

139+
public static class ContainingInAnyOrderMatcher extends TypeSafeDiagnosingMatcher<Translog.Snapshot> {
140+
private final Collection<Translog.Operation> expectedOps;
141+
142+
static List<Translog.Operation> drainAll(Translog.Snapshot snapshot) throws IOException {
143+
final List<Translog.Operation> actualOps = new ArrayList<>();
144+
Translog.Operation op;
145+
while ((op = snapshot.next()) != null) {
146+
actualOps.add(op);
147+
}
148+
return actualOps;
149+
}
150+
151+
public ContainingInAnyOrderMatcher(Collection<Translog.Operation> expectedOps) {
152+
this.expectedOps = expectedOps;
153+
}
154+
155+
@Override
156+
protected boolean matchesSafely(Translog.Snapshot snapshot, Description mismatchDescription) {
157+
try {
158+
List<Translog.Operation> actualOps = drainAll(snapshot);
159+
160+
List<Translog.Operation> notFound = expectedOps.stream()
161+
.filter(o -> actualOps.contains(o) == false)
162+
.collect(Collectors.toList());
163+
if (notFound.isEmpty() == false) {
164+
mismatchDescription
165+
.appendText(" Operations not found").appendValueList("[", ", ", "]", notFound);
166+
}
167+
168+
List<Translog.Operation> notExpected = actualOps.stream()
169+
.filter(o -> expectedOps.contains(o) == false)
170+
.collect(Collectors.toList());
171+
if (notExpected.isEmpty() == false) {
172+
mismatchDescription
173+
.appendText(" Operations not expected ").appendValueList("[", ", ", "]", notExpected);
174+
}
175+
return notFound.isEmpty() && notExpected.isEmpty();
176+
} catch (IOException ex) {
177+
throw new ElasticsearchException("failed to read snapshot content", ex);
178+
}
179+
}
180+
181+
@Override
182+
public void describeTo(Description description) {
183+
184+
}
185+
}
131186
}

0 commit comments

Comments
 (0)