From d6e64ea926bc9ee6f3298d39e1489cbb9dfbed0c Mon Sep 17 00:00:00 2001 From: Nhat Date: Sat, 4 Nov 2017 10:12:44 -0400 Subject: [PATCH 01/15] Dedup translog operations by reading in reverse Currently, translog operations are read and processed one by one. This may be a problem as stale operations in translogs may suddenly reappear in recoveries. To make sure that stale operations won't be processed, we read the translog files in a reverse order (eg. from the most recent file to the oldest file) and only process an operation if its sequence number was not seen before. --- .../index/translog/MultiSnapshot.java | 92 ++++++++++++- .../index/translog/Translog.java | 10 +- .../recovery/RecoverySourceHandler.java | 3 + .../index/translog/MultiSnapshotTests.java | 82 +++++++++++ .../index/translog/SnapshotMatchers.java | 57 +++++++- .../index/translog/TranslogTests.java | 129 +++++++++++++++--- 6 files changed, 344 insertions(+), 29 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 668633e07ef3b..11e78540b029d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -19,6 +19,11 @@ package org.elasticsearch.index.translog; +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongObjectHashMap; +import com.carrotsearch.hppc.LongSet; +import org.apache.lucene.util.FixedBitSet; + import java.io.Closeable; import java.io.IOException; import java.util.Arrays; @@ -30,32 +35,44 @@ final class MultiSnapshot implements Translog.Snapshot { private final TranslogSnapshot[] translogs; private final int totalOperations; + private int skippedOperations; private final Closeable onClose; private int index; + private final SeqNumSet seenSeqNo; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { this.translogs = translogs; - totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.skippedOperations = 0; this.onClose = onClose; - index = 0; + this.seenSeqNo = new SeqNumSet(); + this.index = translogs.length - 1; } - @Override public int totalOperations() { return totalOperations; } + @Override + public int skippedOperations() { + return skippedOperations; + } + @Override public Translog.Operation next() throws IOException { - for (; index < translogs.length; index++) { + for (; index >= 0; index--) { final TranslogSnapshot current = translogs[index]; - Translog.Operation op = current.next(); - if (op != null) { // if we are null we move to the next snapshot - return op; + Translog.Operation op; + while ((op = current.next()) != null) { + if (op.seqNo() < 0 || seenSeqNo.getAndSet(op.seqNo()) == false) { + return op; + }else { + skippedOperations++; + } } } return null; @@ -65,4 +82,65 @@ public Translog.Operation next() throws IOException { public void close() throws IOException { onClose.close(); } + + /** + * A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1). + */ + private static final class CountedBitSet { + private short onBits; + private final FixedBitSet bitset; + + CountedBitSet(short numBits) { + assert numBits > 0; + this.onBits = 0; + this.bitset = new FixedBitSet(numBits); + } + + boolean getAndSet(int index) { + assert index >= 0; + boolean wasOn = bitset.getAndSet(index); + if (wasOn == false) { + onBits++; + } + return wasOn; + } + + boolean hasAllBitsOn() { + return onBits == bitset.length(); + } + } + + /** + * Sequence numbers from translog are likely to form contiguous ranges, thus using two tiers can reduce memory usage. + */ + static final class SeqNumSet { + static final short BIT_SET_SIZE = 1024; + private final LongSet topTier = new LongHashSet(); + private final LongObjectHashMap bottomTier = new LongObjectHashMap<>(); + + /** + * Marks this sequence number and returns true if it is seen before. + */ + boolean getAndSet(long value) { + assert value >= 0; + final long key = value / BIT_SET_SIZE; + + if (topTier.contains(key)) { + return true; + } + + CountedBitSet bitset = bottomTier.get(key); + if (bitset == null) { + bitset = new CountedBitSet(BIT_SET_SIZE); + bottomTier.put(key, bitset); + } + + final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE)); + if (bitset.hasAllBitsOn()) { + bottomTier.remove(key); + topTier.add(key); + } + return wasOn; + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 20c428960f747..95c10e3c96037 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -832,10 +832,18 @@ public int hashCode() { public interface Snapshot extends Closeable { /** - * The total number of operations in the translog. + * The total estimated number of operations in the snapshot. */ int totalOperations(); + /** + * The number of operations has been skipped in the snapshot so far. + * Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called. + */ + default int skippedOperations() { + return 0; + } + /** * Returns the next operation in the snapshot or null if we reached the end. */ diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5f692d8e8f5fa..f6271c8afcc15 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -562,6 +562,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl } } + logger.trace("Translog skipped [{}] operations", snapshot.skippedOperations()); + skippedOps += snapshot.skippedOperations(); + if (!operations.isEmpty() || totalSentOps == 0) { // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint cancellableThreads.executeIO(sendBatch); diff --git a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java new file mode 100644 index 0000000000000..2a2ed85c456c1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongSet; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class MultiSnapshotTests extends ESTestCase { + public void testTrackSeqNumRandomRanges() throws Exception { + final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, between(20_000, 50_000)).forEach(i -> { + long seq = randomNonNegativeLong(); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + }); + } + + public void testTrackSeqNumDenseRanges() throws Exception { + final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, between(20_000, 50_000)).forEach(i -> { + long seq = between(0, 5000); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNumSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed)); + }); + } + + public void testTrackSeqNumSparseRanges() throws Exception { + final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + final LongSet normalSet = new LongHashSet(); + IntStream.range(0, between(20_000, 50_000)).forEach(i -> { + long seq = between(i * 10_000, i * 30_000); + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + }); + } + + public void testSequenceNumMimicTranslog() throws Exception { + final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + final LongSet normalSet = new LongHashSet(); + long currentSeq = between(10_000_000, 1_000_000_000); + final int iterations = between(100, 2000); + for (long i = 0; i < iterations; i++) { + List batch = LongStream.range(currentSeq, currentSeq + between(1, 1000)) + .boxed() + .collect(Collectors.toList()); + Randomness.shuffle(batch); + batch.forEach(seq -> { + boolean existed = normalSet.add(seq) == false; + assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + }); + currentSeq -= batch.size(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index c45da660b0030..b9ce6fea21f5f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -22,10 +22,14 @@ import org.elasticsearch.ElasticsearchException; import org.hamcrest.Description; import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; public final class SnapshotMatchers { @@ -50,10 +54,14 @@ public static Matcher equalsTo(Translog.Operation... ops) { /** * Consumes a snapshot and make sure it's content is as expected */ - public static Matcher equalsTo(ArrayList ops) { + public static Matcher equalsTo(List ops) { return new EqualMatcher(ops.toArray(new Translog.Operation[ops.size()])); } + public static Matcher containsOperationsInAnyOrder(Collection expectedOperations) { + return new ContainingInAnyOrderMatcher(expectedOperations); + } + public static class SizeMatcher extends TypeSafeMatcher { private final int size; @@ -128,4 +136,51 @@ public void describeTo(Description description) { } + public static class ContainingInAnyOrderMatcher extends TypeSafeDiagnosingMatcher { + private final Collection expectedOps; + + static List drainAll(Translog.Snapshot snapshot) throws IOException { + final List actualOps = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + actualOps.add(op); + } + return actualOps; + } + + public ContainingInAnyOrderMatcher(Collection expectedOps) { + this.expectedOps = expectedOps; + } + + @Override + protected boolean matchesSafely(Translog.Snapshot snapshot, Description mismatchDescription) { + try { + List actualOps = drainAll(snapshot); + + List notFound = expectedOps.stream() + .filter(o -> actualOps.contains(o) == false) + .collect(Collectors.toList()); + if (notFound.isEmpty() == false) { + mismatchDescription + .appendText(" Operations not found").appendValueList("[", ", ", "]", notFound); + } + + List notExpected = actualOps.stream() + .filter(o -> expectedOps.contains(o) == false) + .collect(Collectors.toList()); + if (notExpected.isEmpty() == false) { + mismatchDescription + .appendText(" Operations not expected ").appendValueList("[", ", ", "]", notExpected); + } + return notFound.isEmpty() && notExpected.isEmpty(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to read snapshot content", ex); + } + } + + @Override + public void describeTo(Description description) { + + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 771302a903f32..81cbab04651c5 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -83,10 +83,13 @@ import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -107,6 +110,7 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -217,7 +221,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } - private void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { + private void addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { list.add(op); translog.add(op); } @@ -520,7 +524,7 @@ public void testSnapshotWithNewTranslog() throws IOException { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); markCurrentGenAsCommitted(translog); - assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { IOUtils.closeWhileHandlingException(toClose); @@ -1028,7 +1032,7 @@ public void testLocationComparison() throws IOException { } assertEquals(max.generation, translog.currentFileGeneration()); - try (Translog.Snapshot snap = translog.newSnapshot()) { + try (Translog.Snapshot snap = new SortedSnapshot(translog.newSnapshot())) { Translog.Operation next; Translog.Operation maxOp = null; while ((next = snap.next()) != null) { @@ -1252,7 +1256,7 @@ public void testRecoveryUncommitted() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1266,7 +1270,7 @@ public void testRecoveryUncommitted() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1310,7 +1314,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1325,7 +1329,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -1374,7 +1378,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) { int upTo = sync ? translogOperations : prepareOp; for (int i = 0; i < upTo; i++) { Translog.Operation next = snapshot.next(); @@ -2061,7 +2065,7 @@ protected TranslogWriter createWriter(long fileGeneration) throws IOException { } public void testRecoverWithUnbackedNextGen() throws IOException { - translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); translog.close(); TranslogConfig config = translog.getConfig(); @@ -2072,21 +2076,25 @@ public void testRecoverWithUnbackedNextGen() throws IOException { try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - for (int i = 0; i < 1; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); - } - tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + + Translog.Operation op = snapshot.next(); + assertNotNull("operation 1 must be non-null", op); + assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString())); + + tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8")))); } + try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - for (int i = 0; i < 2; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); - } + + Translog.Operation secondOp = snapshot.next(); + assertNotNull("operation 2 must be non-null", secondOp); + assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2); + + Translog.Operation firstOp = snapshot.next(); + assertNotNull("operation 1 must be non-null", firstOp); + assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1); } } @@ -2488,6 +2496,7 @@ public void testMinSeqNoBasedAPI() throws IOException { assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); readFromSnapshot++; } + readFromSnapshot += snapshot.skippedOperations(); } assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo; @@ -2533,4 +2542,84 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { } } } + + public void testSnapshotReadOperationInReverse() throws Exception { + final Deque> views = new ArrayDeque<>(); + views.push(new ArrayList<>()); + final AtomicLong seqNo = new AtomicLong(); + + final int generations = randomIntBetween(2, 20); + for (int gen = 0; gen < generations; gen++) { + final int operations = randomIntBetween(1, 100); + for (int i = 0; i < operations; i++) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), new byte[]{1}); + translog.add(op); + views.peek().add(op); + } + if (frequently()) { + translog.rollGeneration(); + views.push(new ArrayList<>()); + } + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + final List expectedSeqNo = new ArrayList<>(); + while (views.isEmpty() == false) { + expectedSeqNo.addAll(views.pop()); + } + assertThat(snapshot, SnapshotMatchers.equalsTo(expectedSeqNo)); + } + } + + public void testSnapshotDedupOperations() throws Exception { + final Map latestOperations = new HashMap<>(); + final int generations = between(2, 20); + for (int gen = 0; gen < generations; gen++) { + List batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList()); + Randomness.shuffle(batch); + for (Long seqNo : batch) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1}); + translog.add(op); + latestOperations.put(op.seqNo(), op); + } + translog.rollGeneration(); + } + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, containsOperationsInAnyOrder(latestOperations.values())); + } + } + + static class SortedSnapshot implements Translog.Snapshot { + private final Translog.Snapshot snapshot; + private List operations = null; + + SortedSnapshot(Translog.Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public Translog.Operation next() throws IOException { + if (operations == null) { + operations = new ArrayList<>(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operations.add(op); + } + operations.sort(Comparator.comparing(Translog.Operation::seqNo)); + } + if (operations.isEmpty()) { + return null; + } + return operations.remove(0); + } + + @Override + public void close() throws IOException { + snapshot.close(); + } + } } From d006988edf9b7a78996f1cc1a4f6577fa6d2981f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 14:06:58 -0500 Subject: [PATCH 02/15] add the replication test --- .../IndexLevelReplicationTests.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index cf4dab733f237..a9810b09c5f8c 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -46,16 +46,23 @@ import org.hamcrest.Matcher; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase { @@ -299,6 +306,65 @@ public void testRequestFailureReplication() throws Exception { } } + public void testTranslogDedupOperations() throws Exception { + try (ReplicationGroup shards = createGroup(2)) { + shards.startAll(); + int initDocs = shards.indexDocs(randomInt(10)); + List replicas = shards.getReplicas(); + IndexShard replica1 = replicas.get(0); + IndexShard replica2 = replicas.get(1); + + logger.info("--> Isolate replica1"); + IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); + BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); + for (int i = 1; i < replicas.size(); i++) { + indexOnReplica(replicationRequest, replicas.get(i)); + } + + final Translog.Operation op1; + final List initOperations = new ArrayList<>(initDocs); + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + for (int i = 0; i < initDocs; i++) { + Translog.Operation op = snapshot.next(); + assertThat(op, is(notNullValue())); + initOperations.add(op); + } + op1 = snapshot.next(); + assertThat(op1, notNullValue()); + assertThat(snapshot.next(), nullValue()); + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + + // Make sure that replica2 receives translog from replica1 and overwrites its stale operation (op1). + logger.info("--> Promote replica1 as the primary"); + shards.promoteReplicaToPrimary(replica1); + shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); + final Translog.Operation op2; + try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), greaterThanOrEqualTo(initDocs + 2)); + op2 = snapshot.next(); + assertThat(op2.seqNo(), equalTo(op1.seqNo())); + assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(1)); + } + + // Make sure that peer-recovery transfers all but non-duplicated operations. + IndexShard replica3 = shards.addReplica(); + logger.info("--> Promote replica2 as the primary"); + shards.promoteReplicaToPrimary(replica2); + logger.info("--> Recover replica3 from replica2"); + recoverReplica(replica3, replica2); + try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); + assertThat(snapshot.next(), equalTo(op2)); + assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); + assertThat(snapshot.skippedOperations(), equalTo(0)); + } + } + } + /** Throws documentFailure on every indexing operation */ static class ThrowingDocumentFailureEngineFactory implements EngineFactory { final String documentFailureMessage; From 59350092ea20dc6e7221409fe50633177e94f12b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 14:07:53 -0500 Subject: [PATCH 03/15] remove tier in name and comment --- .../index/translog/MultiSnapshot.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 11e78540b029d..0983bfdb32c83 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -70,7 +70,7 @@ public Translog.Operation next() throws IOException { while ((op = current.next()) != null) { if (op.seqNo() < 0 || seenSeqNo.getAndSet(op.seqNo()) == false) { return op; - }else { + } else { skippedOperations++; } } @@ -111,12 +111,13 @@ boolean hasAllBitsOn() { } /** - * Sequence numbers from translog are likely to form contiguous ranges, thus using two tiers can reduce memory usage. + * Sequence numbers from translog are likely to form contiguous ranges, + * thus collapsing a completed bitset into a single entry will reduce memory usage. */ static final class SeqNumSet { static final short BIT_SET_SIZE = 1024; - private final LongSet topTier = new LongHashSet(); - private final LongObjectHashMap bottomTier = new LongObjectHashMap<>(); + private final LongSet completedSets = new LongHashSet(); + private final LongObjectHashMap ongoingSets = new LongObjectHashMap<>(); /** * Marks this sequence number and returns true if it is seen before. @@ -125,22 +126,32 @@ boolean getAndSet(long value) { assert value >= 0; final long key = value / BIT_SET_SIZE; - if (topTier.contains(key)) { + if (completedSets.contains(key)) { return true; } - CountedBitSet bitset = bottomTier.get(key); + CountedBitSet bitset = ongoingSets.get(key); if (bitset == null) { bitset = new CountedBitSet(BIT_SET_SIZE); - bottomTier.put(key, bitset); + ongoingSets.put(key, bitset); } final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE)); if (bitset.hasAllBitsOn()) { - bottomTier.remove(key); - topTier.add(key); + ongoingSets.remove(key); + completedSets.add(key); } return wasOn; } + + // For testing + long completeSetsSize() { + return completedSets.size(); + } + + // For testing + long ongoingSetsSize() { + return ongoingSets.size(); + } } } From ea11a2adc28b6c5b8b6498df8efb1661698f7c3f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 14:08:28 -0500 Subject: [PATCH 04/15] improve the SeqNumSet tests --- .../index/translog/MultiSnapshotTests.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java index 2a2ed85c456c1..7ee0782a5158e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java @@ -30,32 +30,24 @@ import java.util.stream.LongStream; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class MultiSnapshotTests extends ESTestCase { - public void testTrackSeqNumRandomRanges() throws Exception { - final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); - final LongSet normalSet = new LongHashSet(); - IntStream.range(0, between(20_000, 50_000)).forEach(i -> { - long seq = randomNonNegativeLong(); - boolean existed = normalSet.add(seq) == false; - assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); - }); - } - public void testTrackSeqNumDenseRanges() throws Exception { final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); final LongSet normalSet = new LongHashSet(); - IntStream.range(0, between(20_000, 50_000)).forEach(i -> { + IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { long seq = between(0, 5000); boolean existed = normalSet.add(seq) == false; assertThat("SeqNumSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed)); + assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L)); }); } public void testTrackSeqNumSparseRanges() throws Exception { final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); final LongSet normalSet = new LongHashSet(); - IntStream.range(0, between(20_000, 50_000)).forEach(i -> { + IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { long seq = between(i * 10_000, i * 30_000); boolean existed = normalSet.add(seq) == false; assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); @@ -66,17 +58,26 @@ public void testSequenceNumMimicTranslog() throws Exception { final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); final LongSet normalSet = new LongHashSet(); long currentSeq = between(10_000_000, 1_000_000_000); - final int iterations = between(100, 2000); + final int iterations = scaledRandomIntBetween(100, 2000); + assertThat(bitSet.completeSetsSize(), equalTo(0L)); + assertThat(bitSet.ongoingSetsSize(), equalTo(0L)); + long totalDocs = 0; for (long i = 0; i < iterations; i++) { - List batch = LongStream.range(currentSeq, currentSeq + between(1, 1000)) + int batchSize = between(1, 1500); + totalDocs += batchSize; + currentSeq -= batchSize; + List batch = LongStream.range(currentSeq, currentSeq + batchSize) .boxed() .collect(Collectors.toList()); Randomness.shuffle(batch); batch.forEach(seq -> { boolean existed = normalSet.add(seq) == false; assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L)); }); - currentSeq -= batch.size(); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); } + assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024)); + assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); } } From ebe92b665dfbfda3f355cfa83f619b8e835d9d5d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 14:08:37 -0500 Subject: [PATCH 05/15] add description for the matcher --- .../index/translog/SnapshotMatchers.java | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index b9ce6fea21f5f..4ca6057bd6bc9 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -22,7 +22,6 @@ import org.elasticsearch.ElasticsearchException; import org.hamcrest.Description; import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import java.io.IOException; @@ -135,9 +134,10 @@ public void describeTo(Description description) { } } - - public static class ContainingInAnyOrderMatcher extends TypeSafeDiagnosingMatcher { + public static class ContainingInAnyOrderMatcher extends TypeSafeMatcher { private final Collection expectedOps; + private List notFoundOps; + private List notExpectedOps; static List drainAll(Translog.Snapshot snapshot) throws IOException { final List actualOps = new ArrayList<>(); @@ -153,34 +153,41 @@ public ContainingInAnyOrderMatcher(Collection expectedOps) { } @Override - protected boolean matchesSafely(Translog.Snapshot snapshot, Description mismatchDescription) { + protected boolean matchesSafely(Translog.Snapshot snapshot) { try { List actualOps = drainAll(snapshot); - - List notFound = expectedOps.stream() + notFoundOps = expectedOps.stream() .filter(o -> actualOps.contains(o) == false) .collect(Collectors.toList()); - if (notFound.isEmpty() == false) { - mismatchDescription - .appendText(" Operations not found").appendValueList("[", ", ", "]", notFound); - } - - List notExpected = actualOps.stream() + notExpectedOps = actualOps.stream() .filter(o -> expectedOps.contains(o) == false) .collect(Collectors.toList()); - if (notExpected.isEmpty() == false) { - mismatchDescription - .appendText(" Operations not expected ").appendValueList("[", ", ", "]", notExpected); - } - return notFound.isEmpty() && notExpected.isEmpty(); + return notFoundOps.isEmpty() && notExpectedOps.isEmpty(); } catch (IOException ex) { throw new ElasticsearchException("failed to read snapshot content", ex); } } @Override - public void describeTo(Description description) { + protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) { + if (notFoundOps.isEmpty() == false) { + mismatchDescription + .appendText("not found ").appendValueList("[", ", ", "]", notFoundOps); + } + if (notExpectedOps.isEmpty() == false) { + if (notFoundOps.isEmpty() == false) { + mismatchDescription.appendText("; "); + } + mismatchDescription + .appendText("not expected ").appendValueList("[", ", ", "]", notExpectedOps); + } + } + @Override + public void describeTo(Description description) { + description.appendText("snapshot contains ") + .appendValueList("[", ", ", "]", expectedOps) + .appendText(" in any order."); } } } From 5cc33bbc2505fd94eeaa4bd1f4834e55f3d79335 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 16:49:31 -0500 Subject: [PATCH 06/15] skippedOperations -> overriddenOperations --- .../elasticsearch/index/translog/MultiSnapshot.java | 10 +++++----- .../org/elasticsearch/index/translog/Translog.java | 7 ++++--- .../indices/recovery/RecoverySourceHandler.java | 9 ++++----- .../index/replication/IndexLevelReplicationTests.java | 10 +++++----- .../elasticsearch/index/translog/TranslogTests.java | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 0983bfdb32c83..d29b2a798a23c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -35,7 +35,7 @@ final class MultiSnapshot implements Translog.Snapshot { private final TranslogSnapshot[] translogs; private final int totalOperations; - private int skippedOperations; + private int overriddenOperations; private final Closeable onClose; private int index; private final SeqNumSet seenSeqNo; @@ -46,7 +46,7 @@ final class MultiSnapshot implements Translog.Snapshot { MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { this.translogs = translogs; this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); - this.skippedOperations = 0; + this.overriddenOperations = 0; this.onClose = onClose; this.seenSeqNo = new SeqNumSet(); this.index = translogs.length - 1; @@ -58,8 +58,8 @@ public int totalOperations() { } @Override - public int skippedOperations() { - return skippedOperations; + public int overriddenOperations() { + return overriddenOperations; } @Override @@ -71,7 +71,7 @@ public Translog.Operation next() throws IOException { if (op.seqNo() < 0 || seenSeqNo.getAndSet(op.seqNo()) == false) { return op; } else { - skippedOperations++; + overriddenOperations++; } } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 95c10e3c96037..5493830aa4460 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -837,10 +837,11 @@ public interface Snapshot extends Closeable { int totalOperations(); /** - * The number of operations has been skipped in the snapshot so far. - * Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called. + * The number of operations has been overridden (eg. superseded) in the snapshot so far. + * If two operations have the same sequence number, the operation with a lower term will be overridden by the operation + * with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called. */ - default int skippedOperations() { + default int overriddenOperations() { return 0; } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f6271c8afcc15..71ad21c14d7ec 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -66,6 +66,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -562,16 +563,14 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl } } - logger.trace("Translog skipped [{}] operations", snapshot.skippedOperations()); - skippedOps += snapshot.skippedOperations(); - if (!operations.isEmpty() || totalSentOps == 0) { // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint cancellableThreads.executeIO(sendBatch); } - assert expectedTotalOps == skippedOps + totalSentOps - : "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]"; + assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps + : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", + expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps); logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index a9810b09c5f8c..e2489bc64f5d3 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -333,10 +333,10 @@ public void testTranslogDedupOperations() throws Exception { op1 = snapshot.next(); assertThat(op1, notNullValue()); assertThat(snapshot.next(), nullValue()); - assertThat(snapshot.skippedOperations(), equalTo(0)); + assertThat(snapshot.overriddenOperations(), equalTo(0)); } - // Make sure that replica2 receives translog from replica1 and overwrites its stale operation (op1). + // Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1). logger.info("--> Promote replica1 as the primary"); shards.promoteReplicaToPrimary(replica1); shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); @@ -347,10 +347,10 @@ public void testTranslogDedupOperations() throws Exception { assertThat(op2.seqNo(), equalTo(op1.seqNo())); assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(1)); + assertThat(snapshot.overriddenOperations(), greaterThanOrEqualTo(1)); } - // Make sure that peer-recovery transfers all but non-duplicated operations. + // Make sure that peer-recovery transfers all but non-overridden operations. IndexShard replica3 = shards.addReplica(); logger.info("--> Promote replica2 as the primary"); shards.promoteReplicaToPrimary(replica2); @@ -360,7 +360,7 @@ public void testTranslogDedupOperations() throws Exception { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); assertThat(snapshot.next(), equalTo(op2)); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat(snapshot.skippedOperations(), equalTo(0)); + assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0)); } } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 7386300bb32a1..01193815e9ac4 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2497,7 +2497,7 @@ public void testMinSeqNoBasedAPI() throws IOException { assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); readFromSnapshot++; } - readFromSnapshot += snapshot.skippedOperations(); + readFromSnapshot += snapshot.overriddenOperations(); } assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo; From aa21d8731a30e877aed084b42c849e895ec338c9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 17:49:30 -0500 Subject: [PATCH 07/15] check against unassigned --- .../java/org/elasticsearch/index/translog/MultiSnapshot.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index d29b2a798a23c..954647d8860f0 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.LongObjectHashMap; import com.carrotsearch.hppc.LongSet; import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.Closeable; import java.io.IOException; @@ -68,7 +69,7 @@ public Translog.Operation next() throws IOException { final TranslogSnapshot current = translogs[index]; Translog.Operation op; while ((op = current.next()) != null) { - if (op.seqNo() < 0 || seenSeqNo.getAndSet(op.seqNo()) == false) { + if (op.seqNo() < SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { return op; } else { overriddenOperations++; From 9bdb50794a48bf12eb48c49d94b41e15e8fbd98f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 17:50:09 -0500 Subject: [PATCH 08/15] correct condition --- .../java/org/elasticsearch/index/translog/MultiSnapshot.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 954647d8860f0..21dbee5720b7c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -69,7 +69,7 @@ public Translog.Operation next() throws IOException { final TranslogSnapshot current = translogs[index]; Translog.Operation op; while ((op = current.next()) != null) { - if (op.seqNo() < SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { + if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) { return op; } else { overriddenOperations++; From b3e0b6d1bb5ba3c7fd329b4ab7e1561e4a100364 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 8 Nov 2017 18:09:31 -0500 Subject: [PATCH 09/15] fix compilation --- .../java/org/elasticsearch/index/translog/TranslogTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 01193815e9ac4..bc4faf3254c83 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -111,7 +111,6 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; -import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; From 7d3482ba5a0b6bdb3cf56c7e847842c3679e242f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Nov 2017 08:49:49 -0500 Subject: [PATCH 10/15] simple test --- .../index/translog/MultiSnapshotTests.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java index 7ee0782a5158e..a7fcdd46f0289 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java @@ -33,6 +33,25 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; public class MultiSnapshotTests extends ESTestCase { + + public void testTrackSimpleSeqNoRanges() throws Exception { + final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + final List values = LongStream.range(0, 1024).boxed().collect(Collectors.toList()); + Randomness.shuffle(values); + for (int i = 0; i < 1023; i++) { + assertThat(bitSet.getAndSet(values.get(i)), equalTo(false)); + assertThat(bitSet.ongoingSetsSize(), equalTo(1L)); + assertThat(bitSet.completeSetsSize(), equalTo(0L)); + } + + assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false)); + assertThat(bitSet.ongoingSetsSize(), equalTo(0L)); + assertThat(bitSet.completeSetsSize(), equalTo(1L)); + + assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true)); + assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false)); + } + public void testTrackSeqNumDenseRanges() throws Exception { final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); final LongSet normalSet = new LongHashSet(); From a3994f86d84790051df796868f9fa99ca19a63b6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Nov 2017 09:19:07 -0500 Subject: [PATCH 11/15] SeqNum -> SeqNo --- .../index/translog/MultiSnapshot.java | 6 ++--- .../index/translog/MultiSnapshotTests.java | 22 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 21dbee5720b7c..cc9dbdeb63f1d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -39,7 +39,7 @@ final class MultiSnapshot implements Translog.Snapshot { private int overriddenOperations; private final Closeable onClose; private int index; - private final SeqNumSet seenSeqNo; + private final SeqNoSet seenSeqNo; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. @@ -49,7 +49,7 @@ final class MultiSnapshot implements Translog.Snapshot { this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); this.overriddenOperations = 0; this.onClose = onClose; - this.seenSeqNo = new SeqNumSet(); + this.seenSeqNo = new SeqNoSet(); this.index = translogs.length - 1; } @@ -115,7 +115,7 @@ boolean hasAllBitsOn() { * Sequence numbers from translog are likely to form contiguous ranges, * thus collapsing a completed bitset into a single entry will reduce memory usage. */ - static final class SeqNumSet { + static final class SeqNoSet { static final short BIT_SET_SIZE = 1024; private final LongSet completedSets = new LongHashSet(); private final LongObjectHashMap ongoingSets = new LongObjectHashMap<>(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java index a7fcdd46f0289..7ee2a6c3366e3 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java @@ -34,8 +34,8 @@ public class MultiSnapshotTests extends ESTestCase { - public void testTrackSimpleSeqNoRanges() throws Exception { - final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + public void testTrackSeqNoSimpleRange() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); final List values = LongStream.range(0, 1024).boxed().collect(Collectors.toList()); Randomness.shuffle(values); for (int i = 0; i < 1023; i++) { @@ -52,29 +52,29 @@ public void testTrackSimpleSeqNoRanges() throws Exception { assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false)); } - public void testTrackSeqNumDenseRanges() throws Exception { - final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + public void testTrackSeqNoDenseRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); final LongSet normalSet = new LongHashSet(); IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { long seq = between(0, 5000); boolean existed = normalSet.add(seq) == false; - assertThat("SeqNumSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed)); + assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed)); assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L)); }); } - public void testTrackSeqNumSparseRanges() throws Exception { - final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + public void testTrackSeqNoSparseRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); final LongSet normalSet = new LongHashSet(); IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> { long seq = between(i * 10_000, i * 30_000); boolean existed = normalSet.add(seq) == false; - assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed)); }); } - public void testSequenceNumMimicTranslog() throws Exception { - final MultiSnapshot.SeqNumSet bitSet = new MultiSnapshot.SeqNumSet(); + public void testTrackSeqNoMimicTranslogRanges() throws Exception { + final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet(); final LongSet normalSet = new LongHashSet(); long currentSeq = between(10_000_000, 1_000_000_000); final int iterations = scaledRandomIntBetween(100, 2000); @@ -91,7 +91,7 @@ public void testSequenceNumMimicTranslog() throws Exception { Randomness.shuffle(batch); batch.forEach(seq -> { boolean existed = normalSet.add(seq) == false; - assertThat("SeqNumSet != Set", bitSet.getAndSet(seq), equalTo(existed)); + assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed)); assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L)); }); assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L)); From 48b0ff95c7b5aa24f183c7e0d99d5412ebda6978 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Nov 2017 11:15:39 -0500 Subject: [PATCH 12/15] address feedback --- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 48069322add9d..80033833899ec 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -836,7 +836,7 @@ public interface Snapshot extends Closeable { int totalOperations(); /** - * The number of operations has been overridden (eg. superseded) in the snapshot so far. + * The number of operations have been overridden (eg. superseded) in the snapshot so far. * If two operations have the same sequence number, the operation with a lower term will be overridden by the operation * with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called. */ From 97aa4d7aa14e28dc26b380373ad79aa1278b1579 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Nov 2017 12:53:27 -0500 Subject: [PATCH 13/15] more restrict asserts --- .../index/replication/IndexLevelReplicationTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e2489bc64f5d3..9e4ee578a7ad6 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -306,13 +306,14 @@ public void testRequestFailureReplication() throws Exception { } } - public void testTranslogDedupOperations() throws Exception { + public void testSeqNoCollision() throws Exception { try (ReplicationGroup shards = createGroup(2)) { shards.startAll(); int initDocs = shards.indexDocs(randomInt(10)); List replicas = shards.getReplicas(); IndexShard replica1 = replicas.get(0); IndexShard replica2 = replicas.get(1); + shards.syncGlobalCheckpoint(); logger.info("--> Isolate replica1"); IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); @@ -335,19 +336,18 @@ public void testTranslogDedupOperations() throws Exception { assertThat(snapshot.next(), nullValue()); assertThat(snapshot.overriddenOperations(), equalTo(0)); } - // Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1). logger.info("--> Promote replica1 as the primary"); - shards.promoteReplicaToPrimary(replica1); + shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); final Translog.Operation op2; try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) { - assertThat(snapshot.totalOperations(), greaterThanOrEqualTo(initDocs + 2)); + assertThat(snapshot.totalOperations(), equalTo(initDocs + 2)); op2 = snapshot.next(); assertThat(op2.seqNo(), equalTo(op1.seqNo())); assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat(snapshot.overriddenOperations(), greaterThanOrEqualTo(1)); + assertThat(snapshot.overriddenOperations(), equalTo(1)); } // Make sure that peer-recovery transfers all but non-overridden operations. From a3ff8c87db6fbc1b9364b40de7a43c0697a49988 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 22 Nov 2017 17:00:03 -0500 Subject: [PATCH 14/15] another address --- .../index/replication/IndexLevelReplicationTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 9e4ee578a7ad6..cbfaa8453db0c 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -318,9 +318,7 @@ public void testSeqNoCollision() throws Exception { logger.info("--> Isolate replica1"); IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON); BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary()); - for (int i = 1; i < replicas.size(); i++) { - indexOnReplica(replicationRequest, replicas.get(i)); - } + indexOnReplica(replicationRequest, replica2); final Translog.Operation op1; final List initOperations = new ArrayList<>(initDocs); From f79e3ef05c5eaead785e2985438b0eb3dc05672b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 23 Nov 2017 09:23:14 -0500 Subject: [PATCH 15/15] add todo --- .../index/replication/IndexLevelReplicationTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index cbfaa8453db0c..8c15a2a84ddb8 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -360,6 +360,11 @@ public void testSeqNoCollision() throws Exception { assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0)); } + // TODO: We should assert the content of shards in the ReplicationGroup. + // Without rollback replicas(current implementation), we don't have the same content across shards: + // - replica1 has {doc1} + // - replica2 has {doc1, doc2} + // - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery } }