Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
404b8dc
Introduce sequence number-based recovery
jasontedor Jan 3, 2017
d360a23
Simplify sequence number-based recovery
jasontedor Jan 6, 2017
6ec0ef6
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 8, 2017
b9200cf
Remove obsolete field from RecoverySourceHandler
jasontedor Jan 8, 2017
91e1ff0
Handle translog missing while preparing recovery
jasontedor Jan 8, 2017
1c14260
Skip adding operations without sequence number
jasontedor Jan 8, 2017
8b0e501
Revert adding no-ops on version confict in replica
jasontedor Jan 15, 2017
dac513a
Revert whitespace change in MultiSnapshot.java
jasontedor Jan 15, 2017
81a1e1c
Bubble up translog I/O exceptions during recovery
jasontedor Jan 15, 2017
8960522
Add assertion on number of recovered ops
jasontedor Jan 16, 2017
d71aa16
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 16, 2017
b6e6cc3
Restore logger specific to GlobalCheckpointTracker
jasontedor Jan 16, 2017
ab18bde
Use readLong/writeLong for serializing seq. no.
jasontedor Jan 16, 2017
cea70f4
Remove unneeded assertion disjunction in engine
jasontedor Jan 16, 2017
34fbb37
Explicitly prepare shard for peer recovery
jasontedor Jan 16, 2017
1929c03
Load sequence number statistics from the store
jasontedor Jan 16, 2017
c0169c2
Rename replication test case method
jasontedor Jan 16, 2017
999ca91
Iteration
jasontedor Jan 17, 2017
7281b75
Add in-flight ops recovery test
jasontedor Jan 18, 2017
320301c
Add assertion on recoveries targeting old replicas
jasontedor Jan 18, 2017
50d3191
Add defensive assertion on starting seq. no.
jasontedor Jan 18, 2017
2596dcd
Store#loadSeqNoStats iteration
jasontedor Jan 18, 2017
cc2002c
Simplify preparing start recovery request
jasontedor Jan 19, 2017
1e59f84
Only mark assigned sequence numbers as completed
jasontedor Jan 19, 2017
cc30114
Iteration
jasontedor Jan 19, 2017
781f276
Add comment regarding starting seq. no.
jasontedor Jan 19, 2017
f64b26f
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 20, 2017
3a70bd7
Add missing license header
jasontedor Jan 20, 2017
88fc801
Fix start recovery request serialization test
jasontedor Jan 20, 2017
6a0b70e
Revert start recovery request assertion
jasontedor Jan 20, 2017
4e35141
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 23, 2017
3571036
Add note about snapshot restore impact on recovery
jasontedor Jan 23, 2017
da94b28
Rename EvilRecoveryIT to EvilPeerRecoveryIT
jasontedor Jan 23, 2017
e801c5b
Use bulk thread pool size for number of docs
jasontedor Jan 23, 2017
a4cf33e
Remove extraneous blank line
jasontedor Jan 23, 2017
c16295a
Temporarily enable trace logging on test
jasontedor Jan 23, 2017
32c6702
More trace logging for test
jasontedor Jan 24, 2017
76f8807
Fix shard ID in logging statement
jasontedor Jan 24, 2017
adafa21
Fix RFGIT#testReusePeerRecovery test bug
jasontedor Jan 25, 2017
270a68a
Cleanup
jasontedor Jan 25, 2017
2e67a0b
Revert "Fix RFGIT#testReusePeerRecovery test bug"
jasontedor Jan 26, 2017
06a3785
Rewrite reuse peer recovery test
jasontedor Jan 26, 2017
137ffb4
Merge branch 'master' into replica-sequence-number-recovery
jasontedor Jan 26, 2017
eeaa4f9
Remove unused imports
jasontedor Jan 26, 2017
62aabb0
Cleanup test
jasontedor Jan 26, 2017
97e0b20
More cleanup
jasontedor Jan 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]PeerRecoverySourceService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]StartRecoveryRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]ttl[/\\]IndicesTTLService.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -115,7 +115,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.ALLOW_UNMAPPED,
IndexSettings.INDEX_CHECK_ON_STARTUP,
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE,
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
IndexSettings.MAX_SLICES_PER_SCROLL,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a typo here

package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.store.AlreadyClosedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ void setTook(long took) {
void freeze() {
freeze.set(true);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

}

public static class IndexResult extends Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -119,8 +120,6 @@ public class InternalEngine extends Engine {
private final IndexThrottle throttle;

private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -159,11 +158,12 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = loadSeqNoStatsFromLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbersService.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
Expand Down Expand Up @@ -353,47 +353,6 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog
* checkpoint (global checkpoint).
*
* @param translogConfig the translog config (for the global checkpoint)
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
*/
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
final TranslogConfig translogConfig,
final IndexWriter indexWriter) throws IOException {
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
return loadSeqNoStatsFromLucene(globalCheckpoint, indexWriter);
}

/**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the
* specified global checkpoint.
*
* @param globalCheckpoint the global checkpoint to use
* @param indexWriter the index writer (for the Lucene commit point)
* @return the sequence number stats
*/
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
maxSeqNo = Long.parseLong(entry.getValue());
}
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down Expand Up @@ -793,7 +752,6 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
if (delete.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}

updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
Expand Down Expand Up @@ -1532,11 +1490,11 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,27 @@
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashSet;
import java.util.Locale;
import java.util.Set;

import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;

/**
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
* all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
* master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
* These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
* The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
* <p>
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
*/
public class GlobalCheckpointService extends AbstractIndexShardComponent {
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {

/*
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
Expand All @@ -63,14 +65,14 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
private long globalCheckpoint;

/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
* shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard this service is tracking local checkpoints for
* @param shardId the shard ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
Expand Down Expand Up @@ -127,8 +129,9 @@ synchronized boolean updateCheckpointOnPrimary() {
minCheckpoint = Math.min(cp.value, minCheckpoint);
}
if (minCheckpoint < globalCheckpoint) {
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
+ "] is lower than previous one [" + globalCheckpoint + "]");
final String message =
String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
throw new IllegalStateException(message);
}
if (globalCheckpoint != minCheckpoint) {
logger.trace("global checkpoint updated to [{}]", minCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@
package org.elasticsearch.index.seqno;

import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;

import java.util.LinkedList;

/**
* This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which all previous
* sequence numbers have been processed (inclusive).
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
* previous sequence numbers have been processed (inclusive).
*/
public class LocalCheckpointService extends AbstractIndexShardComponent {
public class LocalCheckpointTracker {

/**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
Expand Down Expand Up @@ -67,17 +66,15 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
private volatile long nextSeqNo;

/**
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint for this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint,
* or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
*
* @param shardId the shard this service is providing tracking local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
*/
LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
super(shardId, indexSettings);
public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] "
Expand Down Expand Up @@ -107,7 +104,7 @@ synchronized long generateSeqNo() {
*
* @param seqNo the sequence number to mark as completed
*/
synchronized void markSeqNoAsCompleted(final long seqNo) {
public synchronized void markSeqNoAsCompleted(final long seqNo) {
// make sure we track highest seen sequence number
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
Expand Down Expand Up @@ -142,10 +139,25 @@ long getMaxSeqNo() {
return nextSeqNo - 1;
}

/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
@SuppressForbidden(reason = "Object#wait")
synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedException {
while (checkpoint < seqNo) {
// notified by updateCheckpoint
this.wait();
}
}

/**
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
* current checkpoint is processed.
*/
@SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 :
Expand All @@ -154,19 +166,24 @@ assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set";
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize;
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
current = processedSeqNo.peekFirst();
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
try {
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize;
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
current = processedSeqNo.peekFirst();
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
} finally {
// notifies waiters in waitForOpsToComplete
this.notifyAll();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class SeqNoStats implements ToXContent, Writeable {

private static final String SEQ_NO = "seq_no";
private static final String MAX_SEQ_NO = "max";
private static final String MAX_SEQ_NO = "max_seq_no";
private static final String LOCAL_CHECKPOINT = "local_checkpoint";
private static final String GLOBAL_CHECKPOINT = "global_checkpoint";

Expand Down
Loading