From fe003bd4fff21f98d84de594943f675bf508fc25 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 24 Aug 2018 17:36:19 -0400 Subject: [PATCH 1/2] Ensure to generate identical NoOp for the same failure We generate NoOps in two places: in InternalEngine and in TransportShardBulkAction after a write operation failed. However, we generate slightly different messages for the same failure: 1. InternalEngine uses Exception#getFailure to generate a message without the class name. > newOp [NoOp{seqNo=1, primaryTerm=1, reason='Contexts are mandatory in > context enabled completion field [suggest_context]'}] 2. TransportShardBulkAction uses Exception#toString to generate a message with the class name. > NoOp{seqNo=1, primaryTerm=1, > reason='java.lang.IllegalArgumentException: Contexts are mandatory in > context enabled completion field [suggest_context]'} If a write operation fails while a replica is recovering, that replica will possibly receive two different NoOps: one from InternalEngine and one from TransportShardBulkAction. Two different NoOps will trip TranslogWriter#assertNoSeqNumberConflic assertion. This commit makes sure that we generate the same No-Ops for the same failure. Closes #32986 --- .../index/engine/InternalEngine.java | 4 +- .../IndexLevelReplicationTests.java | 124 ++++++++---------- .../ESIndexLevelReplicationTestCase.java | 24 +++- 3 files changed, 77 insertions(+), 75 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c4c6792bf46a5..023e659ffabeb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -802,7 +802,7 @@ public IndexResult index(Index index) throws IOException { location = translog.add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog with the generated seq_no - location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString())); } else { location = null; } @@ -1111,7 +1111,7 @@ public DeleteResult delete(Delete delete) throws IOException { location = translog.add(new Translog.Delete(delete, deleteResult)); } else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), - delete.primaryTerm(), deleteResult.getFailure().getMessage())); + delete.primaryTerm(), deleteResult.getFailure().toString())); } else { location = null; } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index f38550d70413b..0397bb55bd3ed 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; @@ -47,6 +46,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTests; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.threadpool.TestThreadPool; @@ -338,38 +338,73 @@ public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exceptio * for primary and replica shards */ public void testDocumentFailureReplication() throws Exception { - final String failureMessage = "simulated document failure"; - final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory = - new ThrowingDocumentFailureEngineFactory(failureMessage); + final IOException indexException = new IOException("simulated indexing failure"); + final IOException deleteException = new IOException("simulated deleting failure"); + final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) -> + new IndexWriter(dir, iwc) { + final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW. + @Override + public long addDocument(Iterable doc) throws IOException { + if (throwAfterIndexedOneDoc.getAndSet(true)) { + throw indexException; + } else { + return super.addDocument(doc); + } + } + @Override + public long deleteDocuments(Term... terms) throws IOException { + throw deleteException; + } + }, null, null, config); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { @Override - protected EngineFactory getEngineFactory(ShardRouting routing) { - return throwingDocumentFailureEngineFactory; - }}) { + protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) { - // test only primary + // start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary. shards.startPrimary(); - BulkItemResponse response = shards.index( - new IndexRequest(index.getName(), "type", "1") - .source("{}", XContentType.JSON) - ); - assertTrue(response.isFailed()); - assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage); - shards.assertAllEqual(0); + long primaryTerm = shards.getPrimary().getPendingPrimaryTerm(); + List expectedTranslogOps = new ArrayList<>(); + BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON)); + assertThat(indexResp.isFailed(), equalTo(false)); + expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, new byte[]{123, 125}, null, -1)); + try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); + } + + indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); + assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); + expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString())); + + BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1")); + assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException)); + expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString())); + shards.assertAllEqual(1); - // add some replicas int nReplica = randomIntBetween(1, 3); for (int i = 0; i < nReplica; i++) { shards.addReplica(); } shards.startReplicas(nReplica); - response = shards.index( - new IndexRequest(index.getName(), "type", "1") - .source("{}", XContentType.JSON) - ); - assertTrue(response.isFailed()); - assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage); - shards.assertAllEqual(0); + for (IndexShard shard : shards) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); + } + } + // unlike previous failures, these two failures replicated directly from the replication channel. + indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); + assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); + expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString())); + + deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1")); + assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException)); + expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString())); + + for (IndexShard shard : shards) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); + } + } + shards.assertAllEqual(1); } } @@ -541,47 +576,4 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { shards.assertAllEqual(0); } } - - /** Throws documentFailure on every indexing operation */ - static class ThrowingDocumentFailureEngineFactory implements EngineFactory { - final String documentFailureMessage; - - ThrowingDocumentFailureEngineFactory(String documentFailureMessage) { - this.documentFailureMessage = documentFailureMessage; - } - - @Override - public Engine newReadWriteEngine(EngineConfig config) { - return InternalEngineTests.createInternalEngine((directory, writerConfig) -> - new IndexWriter(directory, writerConfig) { - @Override - public long addDocument(Iterable doc) throws IOException { - assert documentFailureMessage != null; - throw new IOException(documentFailureMessage); - } - }, null, null, config); - } - } - - private static void assertNoOpTranslogOperationForDocumentFailure( - Iterable replicationGroup, - int expectedOperation, - long expectedPrimaryTerm, - String failureMessage) throws IOException { - for (IndexShard indexShard : replicationGroup) { - try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(expectedOperation)); - long expectedSeqNo = 0L; - Translog.Operation op = snapshot.next(); - do { - assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); - assertThat(op.seqNo(), equalTo(expectedSeqNo)); - assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm)); - assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage)); - op = snapshot.next(); - expectedSeqNo++; - } while (op != null); - } - } - } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 77bc644909ab8..37229430bc21a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -193,14 +194,23 @@ public int appendDocs(final int numOfDoc) throws Exception { } public BulkItemResponse index(IndexRequest indexRequest) throws Exception { + return executeWriteRequest(indexRequest, indexRequest.getRefreshPolicy()); + } + + public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception { + return executeWriteRequest(deleteRequest, deleteRequest.getRefreshPolicy()); + } + + private BulkItemResponse executeWriteRequest( + DocWriteRequest writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), - listener::onFailure); + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), + listener::onFailure); BulkItemRequest[] items = new BulkItemRequest[1]; - items[0] = new BulkItemRequest(0, indexRequest); - BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items); - new IndexingAction(request, wrapBulkListener, this).execute(); + items[0] = new BulkItemRequest(0, writeRequest); + BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items); + new WriteReplicationAction(request, wrapBulkListener, this).execute(); return listener.get(); } @@ -598,9 +608,9 @@ public void respond(ActionListener listener) { } - class IndexingAction extends ReplicationAction { + class WriteReplicationAction extends ReplicationAction { - IndexingAction(BulkShardRequest request, ActionListener listener, ReplicationGroup replicationGroup) { + WriteReplicationAction(BulkShardRequest request, ActionListener listener, ReplicationGroup replicationGroup) { super(request, listener, replicationGroup, "indexing"); } From 66b8b61efb247e26547bfe5162082fa9659ae80c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 24 Aug 2018 18:00:41 -0400 Subject: [PATCH 2/2] replace hard-code bytes with string --- .../index/replication/IndexLevelReplicationTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 0397bb55bd3ed..1d1e423afc1b6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -54,6 +54,7 @@ import org.hamcrest.Matcher; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -366,7 +367,7 @@ public long deleteDocuments(Term... terms) throws IOException { List expectedTranslogOps = new ArrayList<>(); BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON)); assertThat(indexResp.isFailed(), equalTo(false)); - expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, new byte[]{123, 125}, null, -1)); + expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1)); try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); }