Skip to content

Commit d006988

Browse files
committed
add the replication test
1 parent d6e64ea commit d006988

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,23 @@
4646
import org.hamcrest.Matcher;
4747

4848
import java.io.IOException;
49+
import java.util.ArrayList;
4950
import java.util.Collections;
5051
import java.util.List;
5152
import java.util.Map;
5253
import java.util.concurrent.CountDownLatch;
5354
import java.util.concurrent.Future;
5455

56+
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
5557
import static org.hamcrest.Matchers.anyOf;
5658
import static org.hamcrest.Matchers.containsString;
5759
import static org.hamcrest.Matchers.equalTo;
60+
import static org.hamcrest.Matchers.greaterThan;
61+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5862
import static org.hamcrest.Matchers.instanceOf;
63+
import static org.hamcrest.Matchers.notNullValue;
64+
import static org.hamcrest.Matchers.nullValue;
65+
import static org.hamcrest.core.Is.is;
5966

6067
public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
6168

@@ -299,6 +306,65 @@ public void testRequestFailureReplication() throws Exception {
299306
}
300307
}
301308

309+
public void testTranslogDedupOperations() throws Exception {
310+
try (ReplicationGroup shards = createGroup(2)) {
311+
shards.startAll();
312+
int initDocs = shards.indexDocs(randomInt(10));
313+
List<IndexShard> replicas = shards.getReplicas();
314+
IndexShard replica1 = replicas.get(0);
315+
IndexShard replica2 = replicas.get(1);
316+
317+
logger.info("--> Isolate replica1");
318+
IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
319+
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
320+
for (int i = 1; i < replicas.size(); i++) {
321+
indexOnReplica(replicationRequest, replicas.get(i));
322+
}
323+
324+
final Translog.Operation op1;
325+
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
326+
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
327+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
328+
for (int i = 0; i < initDocs; i++) {
329+
Translog.Operation op = snapshot.next();
330+
assertThat(op, is(notNullValue()));
331+
initOperations.add(op);
332+
}
333+
op1 = snapshot.next();
334+
assertThat(op1, notNullValue());
335+
assertThat(snapshot.next(), nullValue());
336+
assertThat(snapshot.skippedOperations(), equalTo(0));
337+
}
338+
339+
// Make sure that replica2 receives translog from replica1 and overwrites its stale operation (op1).
340+
logger.info("--> Promote replica1 as the primary");
341+
shards.promoteReplicaToPrimary(replica1);
342+
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
343+
final Translog.Operation op2;
344+
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
345+
assertThat(snapshot.totalOperations(), greaterThanOrEqualTo(initDocs + 2));
346+
op2 = snapshot.next();
347+
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
348+
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
349+
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
350+
assertThat(snapshot.skippedOperations(), greaterThanOrEqualTo(1));
351+
}
352+
353+
// Make sure that peer-recovery transfers all but non-duplicated operations.
354+
IndexShard replica3 = shards.addReplica();
355+
logger.info("--> Promote replica2 as the primary");
356+
shards.promoteReplicaToPrimary(replica2);
357+
logger.info("--> Recover replica3 from replica2");
358+
recoverReplica(replica3, replica2);
359+
try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) {
360+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
361+
assertThat(snapshot.next(), equalTo(op2));
362+
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
363+
assertThat(snapshot.skippedOperations(), equalTo(0));
364+
}
365+
}
366+
}
367+
302368
/** Throws <code>documentFailure</code> on every indexing operation */
303369
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
304370
final String documentFailureMessage;

0 commit comments

Comments
 (0)