Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,10 @@ protected int sendSnapshot(final long startingSeqNo, final Translog.Snapshot sna
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
// we have to send older ops for which no sequence number was assigned, and any ops after the starting sequence number
if (operation.seqNo() == SequenceNumbersService.UNASSIGNED_SEQ_NO || operation.seqNo() < startingSeqNo) continue;
// if we are doing a sequence-number-based recovery, we have to skip older ops for which no sequence number was assigned, and
// any ops before the starting sequence number
final long seqNo = operation.seqNo();
if (startingSeqNo >= 0 && (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) continue;
operations.add(operation);
ops++;
size += operation.estimateSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -35,13 +37,23 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -60,6 +72,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -136,6 +149,72 @@ public void close() throws IOException {
IOUtils.close(reader, store, targetStore);
}

public void testSendSnapshotSendsOps() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
final long startingSeqNo = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, 16);
final StartRecoveryRequest request = new StartRecoveryRequest(
shardId,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
null,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong());
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final RecoveryTargetHandler recoveryTarget = mock(RecoveryTargetHandler.class);
final RecoverySourceHandler handler =
new RecoverySourceHandler(shard, recoveryTarget, request, () -> 0L, e -> () -> {}, fileChunkSizeInBytes, Settings.EMPTY);
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(16, 64);
for (int i = 0; i < initialNumberOfDocs; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true)));
}
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, i - initialNumberOfDocs, true)));
}
operations.add(null);
int totalOperations = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() {
private int counter = 0;

@Override
public int totalOperations() {
return operations.size() - 1;
}

@Override
public Translog.Operation next() throws IOException {
return operations.get(counter++);
}
});
if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(totalOperations, equalTo(initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers));
} else {
assertThat(totalOperations, equalTo(Math.toIntExact(numberOfDocsWithValidSequenceNumbers - startingSeqNo)));
}
}

private Engine.Index getIndex(final String id) {
final String type = "test";
final ParseContext.Document document = new ParseContext.Document();
document.add(new TextField("test", "test", Field.Store.YES));
final Field uidField = new Field("_uid", Uid.createUid(type, id), UidFieldMapper.Defaults.FIELD_TYPE);
final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
final SeqNoFieldMapper.SequenceID seqID = SeqNoFieldMapper.SequenceID.emptySeqID();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[] { 1 });
final ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, null);
return new Engine.Index(new Term("_uid", doc.uid()), doc);
}

public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -93,7 +96,8 @@ protected int indexDocs(String index, final int idStart, final int numDocs) thro
public void testSeqNoCheckpoints() throws Exception {
Nodes nodes = buildNodeAndVersions();
logger.info("cluster discovered: {}", nodes.toString());
final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
Expand All @@ -109,26 +113,60 @@ public void testSeqNoCheckpoints() throws Exception {
createIndex(index, settings.build());
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
int numDocs = indexDocs(index, 0, randomInt(5));
int numDocs = 0;
final int numberOfInitialDocs = 1 + randomInt(5);
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
numDocs += indexDocs(index, 0, numberOfInitialDocs);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);

logger.info("allowing shards on all nodes");
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add assert counts here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 0112f90.

logger.info("indexing some more docs");
numDocs += indexDocs(index, numDocs, randomInt(5));
assertOK(client().performRequest("POST", index + "/_refresh"));
for (final String bwcName : bwcNamesList) {
assertCount(index, "_only_nodes:" + bwcName, numDocs);
}
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient);
logger.info("moving primary to new node");
Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen();
logger.info("indexing some more docs");
int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5));
numDocs += numDocsOnNewPrimary;
int numDocsOnNewPrimary = 0;
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
numDocs += numberOfDocsAfterMovingPrimary;
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
/*
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in
* the recovery code.
*/
logger.info("setting number of replicas to 0");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 0));
final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5);
logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas);
numDocs += numberOfDocsAfterDroppingReplicas;
logger.info("setting number of replicas to 1");
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh"));
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
assertCount(index, "_primary", numDocs);
assertCount(index, "_replica", numDocs);
assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient);
}
}

private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
assertOK(response);
final int actualCount = Integer.parseInt(objectPath(response).evaluate("count").toString());
assertThat(actualCount, equalTo(expectedCount));
}

private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception {
assertBusy(() -> {
try {
Expand Down