Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ public IndexResult index(Index index) throws IOException {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString()));
} else {
location = null;
}
Expand Down Expand Up @@ -1111,7 +1111,7 @@ public DeleteResult delete(Delete delete) throws IOException {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
delete.primaryTerm(), deleteResult.getFailure().toString()));
} else {
location = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineTests;
Expand All @@ -47,13 +46,15 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTests;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -338,38 +339,73 @@ public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exceptio
* for primary and replica shards
*/
public void testDocumentFailureReplication() throws Exception {
final String failureMessage = "simulated document failure";
final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory =
new ThrowingDocumentFailureEngineFactory(failureMessage);
final IOException indexException = new IOException("simulated indexing failure");
final IOException deleteException = new IOException("simulated deleting failure");
final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
new IndexWriter(dir, iwc) {
final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW.
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
if (throwAfterIndexedOneDoc.getAndSet(true)) {
throw indexException;
} else {
return super.addDocument(doc);
}
}
@Override
public long deleteDocuments(Term... terms) throws IOException {
throw deleteException;
}
}, null, null, config);
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return throwingDocumentFailureEngineFactory;
}}) {
protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) {

// test only primary
// start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary.
shards.startPrimary();
BulkItemResponse response = shards.index(
new IndexRequest(index.getName(), "type", "1")
.source("{}", XContentType.JSON)
);
assertTrue(response.isFailed());
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
shards.assertAllEqual(0);
long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON));
assertThat(indexResp.isFailed(), equalTo(false));
expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1));
try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}

indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));

BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString()));
shards.assertAllEqual(1);

// add some replicas
int nReplica = randomIntBetween(1, 3);
for (int i = 0; i < nReplica; i++) {
shards.addReplica();
}
shards.startReplicas(nReplica);
response = shards.index(
new IndexRequest(index.getName(), "type", "1")
.source("{}", XContentType.JSON)
);
assertTrue(response.isFailed());
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
shards.assertAllEqual(0);
for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
// unlike previous failures, these two failures replicated directly from the replication channel.
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString()));

deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString()));

for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
shards.assertAllEqual(1);
}
}

Expand Down Expand Up @@ -541,47 +577,4 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
shards.assertAllEqual(0);
}
}

/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;

ThrowingDocumentFailureEngineFactory(String documentFailureMessage) {
this.documentFailureMessage = documentFailureMessage;
}

@Override
public Engine newReadWriteEngine(EngineConfig config) {
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
assert documentFailureMessage != null;
throw new IOException(documentFailureMessage);
}
}, null, null, config);
}
}

private static void assertNoOpTranslogOperationForDocumentFailure(
Iterable<IndexShard> replicationGroup,
int expectedOperation,
long expectedPrimaryTerm,
String failureMessage) throws IOException {
for (IndexShard indexShard : replicationGroup) {
try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(expectedOperation));
long expectedSeqNo = 0L;
Translog.Operation op = snapshot.next();
do {
assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP));
assertThat(op.seqNo(), equalTo(expectedSeqNo));
assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm));
assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage));
op = snapshot.next();
expectedSeqNo++;
} while (op != null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
Expand Down Expand Up @@ -193,14 +194,23 @@ public int appendDocs(final int numOfDoc) throws Exception {
}

public BulkItemResponse index(IndexRequest indexRequest) throws Exception {
return executeWriteRequest(indexRequest, indexRequest.getRefreshPolicy());
}

public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception {
return executeWriteRequest(deleteRequest, deleteRequest.getRefreshPolicy());
}

private BulkItemResponse executeWriteRequest(
DocWriteRequest<?> writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception {
PlainActionFuture<BulkItemResponse> listener = new PlainActionFuture<>();
final ActionListener<BulkShardResponse> wrapBulkListener = ActionListener.wrap(
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
listener::onFailure);
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
listener::onFailure);
BulkItemRequest[] items = new BulkItemRequest[1];
items[0] = new BulkItemRequest(0, indexRequest);
BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items);
new IndexingAction(request, wrapBulkListener, this).execute();
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items);
new WriteReplicationAction(request, wrapBulkListener, this).execute();
return listener.get();
}

Expand Down Expand Up @@ -598,9 +608,9 @@ public void respond(ActionListener<Response> listener) {

}

class IndexingAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
class WriteReplicationAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {

IndexingAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
WriteReplicationAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
super(request, listener, replicationGroup, "indexing");
}

Expand Down