diff --git a/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java b/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java index a6978ad2a395b..4c7072e9b4112 100644 --- a/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java +++ b/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java @@ -36,6 +36,7 @@ import java.net.InetAddress; public abstract class FieldStats implements Writeable, ToXContent { + private final byte type; private long maxDoc; private long docCount; @@ -628,4 +629,5 @@ private final static class Fields { final static String MAX_VALUE = new String("max_value"); final static String MAX_VALUE_AS_STRING = new String("max_value_as_string"); } + } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f89e3f939c4f2..4830b373e3bf9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.ESLogger; @@ -54,8 +55,10 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -78,10 +81,13 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED; + /** * */ public class InternalEngine extends Engine { + /** * When we last pruned expired tombstones from versionMap.deletes: */ @@ -111,6 +117,8 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; private final SequenceNumbersService seqNoService; + final static String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; + final static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling @@ -131,12 +139,27 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); - seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings()); + mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); + final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer); + if (logger.isTraceEnabled()) { + logger.trace( + "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); + } + seqNoService = + new SequenceNumbersService( + shardId, + engineConfig.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); indexWriter = writer; translog = openTranslog(engineConfig, writer); assert translog.getGeneration() != null; @@ -287,6 +310,36 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) return null; } + private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { + final long maxSeqNo; + try (IndexReader reader = DirectoryReader.open(writer)) { + final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); + if (stats != null) { + maxSeqNo = (long) stats.getMaxValue(); + } else { + maxSeqNo = NO_OPS_PERFORMED; + } + } + + final Map commitUserData = writer.getCommitData(); + + final long localCheckpoint; + if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) { + localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); + } else { + localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } + + final long globalCheckpoint; + if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) { + globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); + } else { + globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + + return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + } + private SearcherManager createSearcherManager() throws EngineException { boolean success = false; SearcherManager searcherManager = null; @@ -1132,13 +1185,22 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn ensureCanFlush(); try { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); - logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId); - Map commitData = new HashMap<>(2); + final Map commitData = new HashMap<>(5); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration)); commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); + + commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint())); + commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint())); + if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } + + if (logger.isTraceEnabled()) { + logger.trace("committing writer with commit data [{}]", commitData); + } + indexWriter.setCommitData(commitData); writer.commit(); } catch (Throwable ex) { diff --git a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java index c7e2226ac92d0..479e5f23527c4 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java @@ -22,7 +22,13 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.Query; +import org.apache.lucene.util.Bits; +import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -108,6 +114,36 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) { throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable"); } + @Override + public FieldStats stats(IndexReader reader) throws IOException { + // nocommit remove implementation when late-binding commits + // are possible + final List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + + long currentMin = Long.MAX_VALUE; + long currentMax = Long.MIN_VALUE; + boolean found = false; + for (int i = 0; i < leaves.size(); i++) { + final LeafReader leaf = leaves.get(i).reader(); + final NumericDocValues values = leaf.getNumericDocValues(name()); + if (values == null) continue; + final Bits bits = leaf.getLiveDocs(); + for (int docID = 0; docID < leaf.maxDoc(); docID++) { + if (bits == null || bits.get(docID)) { + found = true; + final long value = values.get(docID); + currentMin = Math.min(currentMin, value); + currentMax = Math.max(currentMax, value); + } + } + } + + return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null; + } + } public SeqNoFieldMapper(Settings indexSettings) { @@ -129,7 +165,7 @@ protected void parseCreateField(ParseContext context, List fields) throws @Override public Mapper parse(ParseContext context) throws IOException { - // _seqno added in preparse + // _seq_no added in pre-parse return null; } @@ -157,4 +193,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws protected void doMerge(Mapper mergeWith, boolean updateAllTypes) { // nothing to do } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index 1bc34089238b4..65f9afe78f75e 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -67,16 +67,28 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent { */ final private ObjectLongMap trackingLocalCheckpoint; - private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + private long globalCheckpoint; - public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + /** + * Initialize the global checkpoint service. The {@code globalCheckpoint} + * should be set to the last known global checkpoint for this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED}. + * + * @param shardId the shard this service is providing tracking + * local checkpoints for + * @param indexSettings the index settings + * @param globalCheckpoint the last known global checkpoint for this shard, + * or + * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + */ + GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); + this.globalCheckpoint = globalCheckpoint; } - /** * notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one, * this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late @@ -124,7 +136,7 @@ private boolean updateLocalCheckpointInMap(String allocationId, long localCheckp * @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints * of one of the active allocations is not known. */ - synchronized public boolean updateCheckpointOnPrimary() { + synchronized boolean updateCheckpointOnPrimary() { long minCheckpoint = Long.MAX_VALUE; if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) { return false; @@ -164,7 +176,7 @@ synchronized public long getCheckpoint() { /** * updates the global checkpoint on a replica shard (after it has been updated by the primary). */ - synchronized public void updateCheckpointOnReplica(long globalCheckpoint) { + synchronized void updateCheckpointOnReplica(long globalCheckpoint) { if (this.globalCheckpoint <= globalCheckpoint) { this.globalCheckpoint = globalCheckpoint; logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); @@ -241,4 +253,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) { } return SequenceNumbersService.UNASSIGNED_SEQ_NO; } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index d20432e5071e7..32beb5cb16386 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.SnapshotStatus; import java.util.LinkedList; @@ -39,39 +40,64 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { public static Setting SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope); - /** * an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo} * which marks the seqNo the fist bit in the first array corresponds to. */ final LinkedList processedSeqNo; - final int bitArraysSize; - long firstProcessedSeqNo = 0; + private final int bitArraysSize; + long firstProcessedSeqNo; /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ - volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED; + volatile long checkpoint; /** the next available seqNo - used for seqNo generation */ - volatile long nextSeqNo = 0; - + private volatile long nextSeqNo; - public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + /** + * Initialize the local checkpoint service. The {@code maxSeqNo} should be + * set to the last sequence number assigned by this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} and + * {@code localCheckpoint} should be set to the last known local checkpoint + * for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}. + * + * @param shardId the shard this service is providing tracking + * local checkpoints for + * @param indexSettings the index settings + * @param maxSeqNo the last sequence number assigned by this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param localCheckpoint the last known local checkpoint for this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} + */ + LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) { super(shardId, indexSettings); + if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) { + throw new IllegalArgumentException( + "local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] " + + "but was [" + localCheckpoint + "]"); + } + if (maxSeqNo < 0 && maxSeqNo != SequenceNumbersService.NO_OPS_PERFORMED) { + throw new IllegalArgumentException( + "max seq. no. must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); + } bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); processedSeqNo = new LinkedList<>(); + firstProcessedSeqNo = localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; + this.nextSeqNo = maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; + this.checkpoint = localCheckpoint; } /** * issue the next sequence number **/ - public synchronized long generateSeqNo() { + synchronized long generateSeqNo() { return nextSeqNo++; } /** * marks the processing of the given seqNo have been completed **/ - public synchronized void markSeqNoAsCompleted(long seqNo) { + synchronized void markSeqNoAsCompleted(long seqNo) { // make sure we track highest seen seqNo if (seqNo >= nextSeqNo) { nextSeqNo = seqNo + 1; @@ -94,7 +120,7 @@ public long getCheckpoint() { } /** gets the maximum seqno seen so far */ - public long getMaxSeqNo() { + long getMaxSeqNo() { return nextSeqNo - 1; } @@ -130,7 +156,7 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) */ private FixedBitSet getBitSetForSeqNo(long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstProcessedSeqNo; + assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize; while (bitSetOffset >= processedSeqNo.size()) { processedSeqNo.add(new FixedBitSet(bitArraysSize)); @@ -138,11 +164,11 @@ private FixedBitSet getBitSetForSeqNo(long seqNo) { return processedSeqNo.get(bitSetOffset); } - /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ private int seqNoToBitSetOffset(long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo; return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index fe85172657014..b745194f17c4f 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -30,14 +30,46 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { public final static long UNASSIGNED_SEQ_NO = -2L; + + /** + * Represents no operations have been performed on the shard. + */ public final static long NO_OPS_PERFORMED = -1L; + final LocalCheckpointService localCheckpointService; final GlobalCheckpointService globalCheckpointService; - public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) { + /** + * Initialize the sequence number service. The {@code maxSeqNo} + * should be set to the last sequence number assigned by this + * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, + * {@code localCheckpoint} should be set to the last known local + * checkpoint for this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED}, and + * {@code globalCheckpoint} should be set to the last known global + * checkpoint for this shard, or + * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. + * + * @param shardId the shard this service is providing tracking + * local checkpoints for + * @param indexSettings the index settings + * @param maxSeqNo the last sequence number assigned by this + * shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param localCheckpoint the last known local checkpoint for this shard, + * or {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param globalCheckpoint the last known global checkpoint for this shard, + * or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + */ + public SequenceNumbersService( + final ShardId shardId, + final IndexSettings indexSettings, + final long maxSeqNo, + final long localCheckpoint, + final long globalCheckpoint) { super(shardId, indexSettings); - localCheckpointService = new LocalCheckpointService(shardId, indexSettings); - globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings); + localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint); + globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint); } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c9c1cb315d3cd..634ddb61f0feb 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -115,6 +115,7 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -123,6 +124,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; @@ -138,6 +140,7 @@ public class InternalEngineTests extends ESTestCase { + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); @@ -235,6 +238,7 @@ private ParsedDocument testParsedDocument(String uid, String id, String type, St Field seqNoField = new NumericDocValuesField("_seq_no", 0); document.add(uidField); document.add(versionField); + document.add(seqNoField); return new ParsedDocument(versionField, seqNoField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate); } @@ -310,9 +314,10 @@ public void onFailedEngine(String reason, @Nullable Throwable t) { return config; } - protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); - protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); - protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); + private static final BytesReference B_1 = new BytesArray(new byte[]{1}); + private static final BytesReference B_2 = new BytesArray(new byte[]{2}); + private static final BytesReference B_3 = new BytesArray(new byte[]{3}); + private static final BytesArray SOURCE = new BytesArray("{}".getBytes(Charset.defaultCharset())); public void testSegments() throws Exception { try (Store store = createStore(); @@ -538,26 +543,66 @@ public void testSegmentsStatsIncludingFileSizes() throws Exception { } } - public void testCommitStats() { - Document document = testDocumentWithTextField(); - document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null); - engine.index(new Engine.Index(newUid("1"), doc)); + public void testCommitStats() throws IOException { + InternalEngine engine = null; + try { + this.engine.close(); - CommitStats stats1 = engine.commitStats(); - assertThat(stats1.getGeneration(), greaterThan(0L)); - assertThat(stats1.getId(), notNullValue()); - assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); + final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - engine.flush(true, true); - CommitStats stats2 = engine.commitStats(); - assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); - assertThat(stats2.getId(), notNullValue()); - assertThat(stats2.getId(), not(equalTo(stats1.getId()))); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); - assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); + engine = new InternalEngine(copy(this.engine.config(), this.engine.config().getOpenMode())) { + @Override + public SequenceNumbersService seqNoService() { + return new SequenceNumbersService( + this.config().getShardId(), + this.config().getIndexSettings(), + maxSeqNo.get(), + localCheckpoint.get(), + globalCheckpoint.get()); + } + }; + CommitStats stats1 = engine.commitStats(); + assertThat(stats1.getGeneration(), greaterThan(0L)); + assertThat(stats1.getId(), notNullValue()); + assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.LOCAL_CHECKPOINT_KEY)); + assertThat( + Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); + assertThat( + Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024)); + localCheckpoint.set( + rarely() || maxSeqNo.get() == SequenceNumbersService.NO_OPS_PERFORMED ? + SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024)); + globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbersService.NO_OPS_PERFORMED ? + SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get())); + + engine.flush(true, true); + + CommitStats stats2 = engine.commitStats(); + assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); + assertThat(stats2.getId(), notNullValue()); + assertThat(stats2.getId(), not(equalTo(stats1.getId()))); + assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); + assertThat( + stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), + not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); + assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); + assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); + assertThat( + Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + equalTo(globalCheckpoint.get())); + } finally { + IOUtils.close(engine); + } } public void testIndexSearcherWrapper() throws Exception { @@ -609,7 +654,7 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { try { initialEngine = engine; for (int i = 0; i < ops; i++) { - final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); if (randomBoolean()) { final Engine.Index operation = new Engine.Index(newUid("test#1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()); operations.add(operation); @@ -645,7 +690,7 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { initialEngine = engine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); - final ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + final ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); initialEngine.index(new Engine.Index(newUid(id), doc)); } } finally { @@ -655,7 +700,7 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { Engine recoveringEngine = null; try { final AtomicBoolean flushed = new AtomicBoolean(); - recoveringEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { @Override public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { assertThat(getTranslog().totalOperations(), equalTo(docs)); @@ -1558,51 +1603,82 @@ public void testIndexWriterInfoStream() { } } - public void testSeqNoAndLocalCheckpoint() { - int opCount = randomIntBetween(1, 10); - long seqNoCount = -1; - for (int op = 0; op < opCount; op++) { - final String id = randomFrom("1", "2", "3"); - ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), B_1, null); - if (randomBoolean()) { - final Engine.Index index = new Engine.Index(newUid(id), doc, - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis()); + public void testSeqNoAndCheckpoints() throws IOException { + // nocommit: does not test deletes + final int opCount = randomIntBetween(1, 256); + long primarySeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + final String[] ids = new String[]{"1", "2", "3"}; + long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + InternalEngine initialEngine = null; + try { + initialEngine = engine; + initialEngine + .seqNoService() + .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); + for (int op = 0; op < opCount; op++) { + final String id = randomFrom(ids); + ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); + final Engine.Index index = new Engine.Index(newUid("test#" + id), doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis()); try { - engine.index(index); - } catch (VersionConflictEngineException e) { - // OK - } - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoCount++; - Engine.Index replica = new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), - index.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis()); - replicaEngine.index(replica); - } - } else { - final Engine.Delete delete = new Engine.Delete("test", id, newUid(id), - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis(), false); - try { - engine.delete(delete); + initialEngine.index(index); + primarySeqNo++; } catch (VersionConflictEngineException e) { - // OK + } - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoCount++; - Engine.Delete replica = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis(), false); - replicaEngine.delete(replica); + + replicaLocalCheckpoint = + rarely() ? replicaLocalCheckpoint : randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo)); + initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint()); + initialEngine.seqNoService().updateLocalCheckpointForShard("replica", replicaLocalCheckpoint); + + if (rarely()) { + localCheckpoint = primarySeqNo; + globalCheckpoint = replicaLocalCheckpoint; + initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); + initialEngine.flush(true, true); } } + + initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); + + assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); + assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); + + assertThat( + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), + equalTo(localCheckpoint)); + assertThat( + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + equalTo(globalCheckpoint)); + + } finally { + IOUtils.close(initialEngine); + } + + InternalEngine recoveringEngine = null; + try { + recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringEngine.recoverFromTranslog(); + + assertThat( + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), + equalTo(primarySeqNo)); + assertThat( + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + equalTo(globalCheckpoint)); + assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); + assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1)); + } finally { + IOUtils.close(recoveringEngine); } - assertThat(engine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount)); - assertThat(engine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount)); - assertThat(replicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount)); - assertThat(replicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount)); } // #8603: make sure we can separately log IFD's messages @@ -1920,9 +1996,9 @@ public void testUpgradeOldIndex() throws IOException { } CommitStats commitStats = engine.commitStats(); Map userData = commitStats.getUserData(); - assertTrue("userdata dosn't contain uuid", userData.containsKey(Translog.TRANSLOG_UUID_KEY)); - assertTrue("userdata doesn't contain generation key", userData.containsKey(Translog.TRANSLOG_GENERATION_KEY)); - assertFalse("userdata contains legacy marker", userData.containsKey("translog_id")); + assertTrue("user data doesn't contain uuid", userData.containsKey(Translog.TRANSLOG_UUID_KEY)); + assertTrue("user data doesn't contain generation key", userData.containsKey(Translog.TRANSLOG_GENERATION_KEY)); + assertFalse("user data contains legacy marker", userData.containsKey("translog_id")); } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java index cbdf93fdee807..b24f8828cb988 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java @@ -43,7 +43,7 @@ public void testCheckpointsAdvance() throws Exception { "index.number_of_shards", "1" // simplify things so we know how many ops goes to the shards ).get(); final List builders = new ArrayList<>(); - final int numDocs = scaledRandomIntBetween(0, 100); + final long numDocs = scaledRandomIntBetween(0, 100); logger.info("--> will index [{}] docs", numDocs); for (int i = 0; i < numDocs; i++) { builders.add(client().prepareIndex("test", "type", "id_" + i).setSource("{}")); @@ -60,23 +60,14 @@ public void testCheckpointsAdvance() throws Exception { logger.debug("seq_no stats for {}: {}", shardStats.getShardRouting(), XContentHelper.toString(shardStats.getSeqNoStats(), new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); - final Matcher localCheckpointRule; - final Matcher globalCheckpointRule; - if (shardStats.getShardRouting().primary()) { - localCheckpointRule = equalTo(numDocs - 1L); - globalCheckpointRule = equalTo(numDocs - 1L); - } else { - // nocommit: recovery doesn't transfer checkpoints yet (we don't persist them in lucene). - localCheckpointRule = anyOf(equalTo(numDocs - 1L), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - globalCheckpointRule = anyOf(equalTo(numDocs - 1L), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - } assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", - shardStats.getSeqNoStats().getLocalCheckpoint(), localCheckpointRule); + shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(numDocs - 1)); assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", - shardStats.getSeqNoStats().getGlobalCheckpoint(), globalCheckpointRule); + shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(numDocs - 1)); assertThat(shardStats.getShardRouting() + " max seq no mismatch", - shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1L)); + shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1)); } }); } + } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java index 557f0ca199602..62d833c99258b 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java @@ -41,7 +41,7 @@ public class GlobalCheckpointTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY)); + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), SequenceNumbersService.UNASSIGNED_SEQ_NO); } public void testEmptyShards() { diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java index e040d81393495..1456ed6aca264 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -53,12 +53,13 @@ public void setUp() throws Exception { private LocalCheckpointService getCheckpointService() { return new LocalCheckpointService( - new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", - Settings.builder() - .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) - .build() - )); + new ShardId("test", "_na_", 0), + IndexSettingsModule.newIndexSettings("test", + Settings.builder() + .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) + .build()), + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED); } public void testSimplePrimary() {