From 9799d71bdc12d57ef467a3f46eafefab76a05bf2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Jun 2016 09:18:57 -0400 Subject: [PATCH 01/12] Persist sequence number checkpoints This commit adds persistence for local and global sequence number checkpoints. We also recover the max sequence number in a shard, although there will be loss here today from delete operations. This will be addressed in a follow-up. --- .../action/fieldstats/FieldStats.java | 2 + .../index/engine/InternalEngine.java | 59 ++++- .../mapper/internal/SeqNoFieldMapper.java | 37 ++- .../index/seqno/GlobalCheckpointService.java | 13 +- .../index/seqno/LocalCheckpointService.java | 23 +- .../index/seqno/SequenceNumbersService.java | 11 +- .../index/engine/InternalEngineTests.java | 220 ++++++++++++++---- .../index/seqno/CheckpointsIT.java | 19 +- 8 files changed, 309 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java b/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java index a6978ad2a395b..4c7072e9b4112 100644 --- a/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java +++ b/core/src/main/java/org/elasticsearch/action/fieldstats/FieldStats.java @@ -36,6 +36,7 @@ import java.net.InetAddress; public abstract class FieldStats implements Writeable, ToXContent { + private final byte type; private long maxDoc; private long docCount; @@ -628,4 +629,5 @@ private final static class Fields { final static String MAX_VALUE = new String("max_value"); final static String MAX_VALUE_AS_STRING = new String("max_value_as_string"); } + } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f89e3f939c4f2..e206f1e85d5b9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.ESLogger; @@ -54,8 +55,11 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.seqno.GlobalCheckpointService; +import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -131,12 +135,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis(); - seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings()); + mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); + final long localCheckpoint = loadLocalCheckpointFromCommit(writer); + final long globalCheckpoint = loadGlobalCheckpointFromCommit(writer); + final long maxSeqNo = loadMaxSeqNoFromCommit(writer); + if (logger.isTraceEnabled()) { + logger.trace( + "recovering local checkpoint: [{}], global checkpoint [{}], max sequence number [{}]", + localCheckpoint, + globalCheckpoint, + maxSeqNo); + } + seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings(), maxSeqNo, localCheckpoint, globalCheckpoint); indexWriter = writer; translog = openTranslog(engineConfig, writer); assert translog.getGeneration() != null; @@ -287,6 +302,35 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) return null; } + private long loadLocalCheckpointFromCommit(IndexWriter writer) { + final Map commitUserData = writer.getCommitData(); + if (commitUserData.containsKey(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)) { + return Long.parseLong(commitUserData.get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)); + } else { + return SequenceNumbersService.NO_OPS_PERFORMED; + } + } + + private long loadGlobalCheckpointFromCommit(IndexWriter writer) { + final Map commitUserData = writer.getCommitData(); + if (commitUserData.containsKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)) { + return Long.parseLong(commitUserData.get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)); + } else { + return SequenceNumbersService.UNASSIGNED_SEQ_NO; + } + } + + private long loadMaxSeqNoFromCommit(IndexWriter writer) throws IOException { + try (IndexReader reader = DirectoryReader.open(writer)) { + final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); + if (stats != null) { + return (long) stats.getMaxValue(); + } else { + return SequenceNumbersService.NO_OPS_PERFORMED; + } + } + } + private SearcherManager createSearcherManager() throws EngineException { boolean success = false; SearcherManager searcherManager = null; @@ -1132,13 +1176,22 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn ensureCanFlush(); try { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); - logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId); - Map commitData = new HashMap<>(2); + final Map commitData = new HashMap<>(5); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration)); commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); + + commitData.put(LocalCheckpointService.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint())); + commitData.put(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint())); + if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } + + if (logger.isTraceEnabled()) { + logger.trace("committing writer with commit data [{}]", commitData); + } + indexWriter.setCommitData(commitData); writer.commit(); } catch (Throwable ex) { diff --git a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java index c7e2226ac92d0..fac3ec9e28552 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java @@ -22,7 +22,13 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.Query; +import org.apache.lucene.util.Bits; +import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -108,6 +114,34 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) { throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable"); } + @Override + public FieldStats stats(IndexReader reader) throws IOException { + final List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + + long currentMin = Long.MAX_VALUE; + long currentMax = Long.MIN_VALUE; + boolean found = false; + for (int i = 0; i < leaves.size(); i++) { + final LeafReader leaf = leaves.get(i).reader(); + final NumericDocValues values = leaf.getNumericDocValues(name()); + if (values == null) continue; + found = true; + final Bits bits = leaf.getLiveDocs(); + for (int docID = 0; docID < leaf.maxDoc(); docID++) { + if (bits == null || bits.get(docID)) { + final long value = values.get(docID); + currentMin = Math.min(currentMin, value); + currentMax = Math.max(currentMax, value); + } + } + } + + return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null; + } + } public SeqNoFieldMapper(Settings indexSettings) { @@ -129,7 +163,7 @@ protected void parseCreateField(ParseContext context, List fields) throws @Override public Mapper parse(ParseContext context) throws IOException { - // _seqno added in preparse + // _seq_no added in pre-parse return null; } @@ -157,4 +191,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws protected void doMerge(Mapper mergeWith, boolean updateAllTypes) { // nothing to do } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index 1bc34089238b4..00bb18224a236 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -41,6 +41,8 @@ */ public class GlobalCheckpointService extends AbstractIndexShardComponent { + public static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; + /** * This map holds the last known local checkpoint for every shard copy that's active. * All shard copies in this map participate in determining the global checkpoint @@ -67,16 +69,20 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent { */ final private ObjectLongMap trackingLocalCheckpoint; - private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + private long globalCheckpoint; + + public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) { + this(shardId, indexSettings, SequenceNumbersService.UNASSIGNED_SEQ_NO); + } - public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); + this.globalCheckpoint = globalCheckpoint; } - /** * notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one, * this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late @@ -241,4 +247,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) { } return SequenceNumbersService.UNASSIGNED_SEQ_NO; } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index d20432e5071e7..d47c1dbd38c68 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.SnapshotStatus; import java.util.LinkedList; @@ -32,6 +33,9 @@ */ public class LocalCheckpointService extends AbstractIndexShardComponent { + public static String MAX_SEQ_NO = "max_seq_no"; + public static String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; + /** * we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays * allocating them on demand and cleaning up while completed. This setting controls the size of the arrays @@ -39,26 +43,31 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { public static Setting SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope); - /** * an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo} * which marks the seqNo the fist bit in the first array corresponds to. */ final LinkedList processedSeqNo; final int bitArraysSize; - long firstProcessedSeqNo = 0; + long firstProcessedSeqNo; /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ - volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED; + volatile long checkpoint; /** the next available seqNo - used for seqNo generation */ - volatile long nextSeqNo = 0; + volatile long nextSeqNo; + public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) { + this(shardId, indexSettings, SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.NO_OPS_PERFORMED); + } - public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) { super(shardId, indexSettings); bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); processedSeqNo = new LinkedList<>(); + firstProcessedSeqNo = checkpoint + 1; + this.nextSeqNo = maxSeqNo + 1; + this.checkpoint = checkpoint; } /** @@ -130,7 +139,7 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) */ private FixedBitSet getBitSetForSeqNo(long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstProcessedSeqNo; + assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize; while (bitSetOffset >= processedSeqNo.size()) { processedSeqNo.add(new FixedBitSet(bitArraysSize)); @@ -138,11 +147,11 @@ private FixedBitSet getBitSetForSeqNo(long seqNo) { return processedSeqNo.get(bitSetOffset); } - /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ private int seqNoToBitSetOffset(long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo; return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; } + } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index fe85172657014..c4cdb8326317b 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -34,10 +34,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { final LocalCheckpointService localCheckpointService; final GlobalCheckpointService globalCheckpointService; - public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) { + public SequenceNumbersService( + final ShardId shardId, + final IndexSettings indexSettings, + final long maxSeqNo, + final long localCheckpoint, + final long globalCheckpoint) { super(shardId, indexSettings); - localCheckpointService = new LocalCheckpointService(shardId, indexSettings); - globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings); + localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint); + globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint); } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c9c1cb315d3cd..e0c6f156981f3 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -82,6 +82,8 @@ import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper; +import org.elasticsearch.index.seqno.GlobalCheckpointService; +import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -115,6 +117,8 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -235,6 +239,7 @@ private ParsedDocument testParsedDocument(String uid, String id, String type, St Field seqNoField = new NumericDocValuesField("_seq_no", 0); document.add(uidField); document.add(versionField); + document.add(seqNoField); return new ParsedDocument(versionField, seqNoField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingUpdate); } @@ -548,6 +553,14 @@ public void testCommitStats() { assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(stats1.getUserData(), hasKey(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)); + assertThat( + Long.parseLong(stats1.getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + assertThat(stats1.getUserData(), hasKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)); + assertThat( + Long.parseLong(stats1.getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); engine.flush(true, true); CommitStats stats2 = engine.commitStats(); @@ -558,6 +571,11 @@ public void testCommitStats() { assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); + assertThat(Long.parseLong(stats2.getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(0L)); + assertThat(stats2.getUserData(), hasKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)); + assertThat( + Long.parseLong(stats2.getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); } public void testIndexSearcherWrapper() throws Exception { @@ -655,7 +673,7 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { Engine recoveringEngine = null; try { final AtomicBoolean flushed = new AtomicBoolean(); - recoveringEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { @Override public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { assertThat(getTranslog().totalOperations(), equalTo(docs)); @@ -1558,51 +1576,165 @@ public void testIndexWriterInfoStream() { } } - public void testSeqNoAndLocalCheckpoint() { - int opCount = randomIntBetween(1, 10); - long seqNoCount = -1; - for (int op = 0; op < opCount; op++) { - final String id = randomFrom("1", "2", "3"); - ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), B_1, null); - if (randomBoolean()) { - final Engine.Index index = new Engine.Index(newUid(id), doc, - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis()); - - try { - engine.index(index); - } catch (VersionConflictEngineException e) { - // OK - } - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoCount++; - Engine.Index replica = new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), - index.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis()); - replicaEngine.index(replica); + public void testSeqNoAndCheckpoints() throws IOException { + int opCount = randomIntBetween(1, 256); + long primarySeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + long replicaSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + // we lose deletes when recovering + final String[] ids = new String[]{"1", "2", "3"}; + final Map primaryMaxNoDeleteSeqNo = new HashMap<>(); + final Map replicaMaxNoDeleteSeqNo = new HashMap<>(); + long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + InternalEngine initialEngine = null; + InternalEngine initialReplicaEngine = null; + try { + initialEngine = engine; + initialReplicaEngine = replicaEngine; + boolean broken = false; + for (int op = 0; op < opCount; op++) { + final String id = randomFrom(ids); + ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + if (randomBoolean()) { + final Engine.Index index = new Engine.Index(newUid(id), doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis()); + try { + initialEngine.index(index); + primarySeqNo++; + } catch (VersionConflictEngineException e) { + // OK + } + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + if (rarely()) { + // put a hole in the sequence numbers on the replica + if (!broken) { + broken = true; + replicaLocalCheckpoint = replicaSeqNo; + } + } else { + Engine.Index replica = new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), + index.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis()); + initialReplicaEngine.index(replica); + replicaSeqNo = primarySeqNo; + replicaMaxNoDeleteSeqNo.put(id, replicaSeqNo); + } + primaryMaxNoDeleteSeqNo.put(id, primarySeqNo); + } + } else { + final Engine.Delete delete = new Engine.Delete("test", id, newUid(id), + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis(), false); + try { + initialEngine.delete(delete); + primarySeqNo++; + } catch (VersionConflictEngineException e) { + // OK + } + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + if (rarely()) { + // put a hole in the sequence numbers on the replica + if (!broken) { + broken = true; + replicaLocalCheckpoint = replicaSeqNo; + } + } else { + Engine.Delete replica = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis(), false); + initialReplicaEngine.delete(replica); + replicaSeqNo = primarySeqNo; + replicaMaxNoDeleteSeqNo.remove(id); + } + primaryMaxNoDeleteSeqNo.remove(id); + } } - } else { - final Engine.Delete delete = new Engine.Delete("test", id, newUid(id), - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis(), false); - try { - engine.delete(delete); - } catch (VersionConflictEngineException e) { - // OK + } + + if (!broken) { + replicaLocalCheckpoint = primarySeqNo; + } + + initialEngine + .seqNoService() + .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); + initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint()); + initialEngine.seqNoService().updateLocalCheckpointForShard("replica", initialReplicaEngine.seqNoService().getLocalCheckpoint()); + initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); + + assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); + assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); + assertThat(initialReplicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(replicaSeqNo)); + assertThat(initialReplicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(replicaLocalCheckpoint)); + assertThat( + initialReplicaEngine.seqNoService().stats().getGlobalCheckpoint(), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + + initialEngine.flush(true, true); + assertThat( + Long.parseLong(initialEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + equalTo(primarySeqNo)); + assertThat( + Long.parseLong(initialEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + equalTo(replicaLocalCheckpoint)); + initialReplicaEngine.flush(true, true); + assertThat( + Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + equalTo(replicaLocalCheckpoint)); + assertThat( + Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } finally { + IOUtils.close(initialEngine, initialReplicaEngine); + } + + InternalEngine recoveringEngine = null; + InternalEngine recoveringReplicaEngine = null; + try { + recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + recoveringReplicaEngine = + new InternalEngine(copy(initialReplicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); + + long primaryMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + for (String id : ids) { + if (primaryMaxNoDeleteSeqNo.containsKey(id)) { + primaryMaxSeqNo = Math.max(primaryMaxSeqNo, primaryMaxNoDeleteSeqNo.get(id)); } - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoCount++; - Engine.Delete replica = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis(), false); - replicaEngine.delete(replica); + } + + long replicaMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + for (String id : ids) { + if (replicaMaxNoDeleteSeqNo.containsKey(id)) { + replicaMaxSeqNo = Math.max(replicaMaxSeqNo, replicaMaxNoDeleteSeqNo.get(id)); } } + + assertThat( + Long.parseLong(recoveringEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + equalTo(primarySeqNo)); + assertThat( + Long.parseLong(recoveringEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + equalTo(replicaLocalCheckpoint)); + assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); + assertThat(recoveringEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); + assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primaryMaxSeqNo)); + assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primaryMaxSeqNo + 1)); + assertThat( + Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + equalTo(replicaLocalCheckpoint)); + assertThat( + Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(recoveringReplicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(replicaLocalCheckpoint)); + assertThat( + recoveringReplicaEngine.seqNoService().stats().getGlobalCheckpoint(), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(recoveringReplicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(replicaMaxSeqNo)); + assertThat(recoveringReplicaEngine.seqNoService().generateSeqNo(), equalTo(replicaMaxSeqNo + 1)); + } finally { + IOUtils.close(recoveringEngine, recoveringReplicaEngine); } - assertThat(engine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount)); - assertThat(engine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount)); - assertThat(replicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(seqNoCount)); - assertThat(replicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(seqNoCount)); } // #8603: make sure we can separately log IFD's messages @@ -1920,9 +2052,9 @@ public void testUpgradeOldIndex() throws IOException { } CommitStats commitStats = engine.commitStats(); Map userData = commitStats.getUserData(); - assertTrue("userdata dosn't contain uuid", userData.containsKey(Translog.TRANSLOG_UUID_KEY)); - assertTrue("userdata doesn't contain generation key", userData.containsKey(Translog.TRANSLOG_GENERATION_KEY)); - assertFalse("userdata contains legacy marker", userData.containsKey("translog_id")); + assertTrue("user data doesn't contain uuid", userData.containsKey(Translog.TRANSLOG_UUID_KEY)); + assertTrue("user data doesn't contain generation key", userData.containsKey(Translog.TRANSLOG_GENERATION_KEY)); + assertFalse("user data contains legacy marker", userData.containsKey("translog_id")); } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java index cbdf93fdee807..b24f8828cb988 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/CheckpointsIT.java @@ -43,7 +43,7 @@ public void testCheckpointsAdvance() throws Exception { "index.number_of_shards", "1" // simplify things so we know how many ops goes to the shards ).get(); final List builders = new ArrayList<>(); - final int numDocs = scaledRandomIntBetween(0, 100); + final long numDocs = scaledRandomIntBetween(0, 100); logger.info("--> will index [{}] docs", numDocs); for (int i = 0; i < numDocs; i++) { builders.add(client().prepareIndex("test", "type", "id_" + i).setSource("{}")); @@ -60,23 +60,14 @@ public void testCheckpointsAdvance() throws Exception { logger.debug("seq_no stats for {}: {}", shardStats.getShardRouting(), XContentHelper.toString(shardStats.getSeqNoStats(), new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); - final Matcher localCheckpointRule; - final Matcher globalCheckpointRule; - if (shardStats.getShardRouting().primary()) { - localCheckpointRule = equalTo(numDocs - 1L); - globalCheckpointRule = equalTo(numDocs - 1L); - } else { - // nocommit: recovery doesn't transfer checkpoints yet (we don't persist them in lucene). - localCheckpointRule = anyOf(equalTo(numDocs - 1L), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - globalCheckpointRule = anyOf(equalTo(numDocs - 1L), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - } assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", - shardStats.getSeqNoStats().getLocalCheckpoint(), localCheckpointRule); + shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(numDocs - 1)); assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", - shardStats.getSeqNoStats().getGlobalCheckpoint(), globalCheckpointRule); + shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(numDocs - 1)); assertThat(shardStats.getShardRouting() + " max seq no mismatch", - shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1L)); + shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(numDocs - 1)); } }); } + } From 689618bfdf85e2bbd1239517e15c69f50c89e85b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 07:37:43 -0400 Subject: [PATCH 02/12] Add nocommit to sequence number stats method This commit adds a nocommit to the SeqNoFieldType#stats method as the implementation is temporary until late-binding commits are available. --- .../elasticsearch/index/mapper/internal/SeqNoFieldMapper.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java index fac3ec9e28552..f1b11956f40bc 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java @@ -116,6 +116,8 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) { @Override public FieldStats stats(IndexReader reader) throws IOException { + // nocommit remove implementation when late-binding commits + // are possible final List leaves = reader.leaves(); if (leaves.isEmpty()) { return null; From f61b57139de26f1771fea16c031ad3bffac054e6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 07:44:57 -0400 Subject: [PATCH 03/12] Move global checkpoint commit key This commit moves the global checkpoint commit key from the global checkpoint service to the internal engine where it is better placed for clarity. This commit also enables marking this field as having default access. --- .../index/engine/InternalEngine.java | 9 +++++---- .../index/seqno/GlobalCheckpointService.java | 2 -- .../index/engine/InternalEngineTests.java | 17 ++++++++--------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e206f1e85d5b9..47da0ab9da0cd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -58,7 +58,6 @@ import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; -import org.elasticsearch.index.seqno.GlobalCheckpointService; import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; @@ -86,6 +85,7 @@ * */ public class InternalEngine extends Engine { + /** * When we last pruned expired tombstones from versionMap.deletes: */ @@ -115,6 +115,7 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; private final SequenceNumbersService seqNoService; + final static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling @@ -313,8 +314,8 @@ private long loadLocalCheckpointFromCommit(IndexWriter writer) { private long loadGlobalCheckpointFromCommit(IndexWriter writer) { final Map commitUserData = writer.getCommitData(); - if (commitUserData.containsKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)) { - return Long.parseLong(commitUserData.get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)); + if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) { + return Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); } else { return SequenceNumbersService.UNASSIGNED_SEQ_NO; } @@ -1182,7 +1183,7 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); commitData.put(LocalCheckpointService.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint())); - commitData.put(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint())); + commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint())); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index 00bb18224a236..9fbee3ba90e20 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -41,8 +41,6 @@ */ public class GlobalCheckpointService extends AbstractIndexShardComponent { - public static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; - /** * This map holds the last known local checkpoint for every shard copy that's active. * All shard copies in this map participate in determining the global checkpoint diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e0c6f156981f3..a8cf4a802ff41 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -82,7 +82,6 @@ import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper; -import org.elasticsearch.index.seqno.GlobalCheckpointService; import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; @@ -557,9 +556,9 @@ public void testCommitStats() { assertThat( Long.parseLong(stats1.getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - assertThat(stats1.getUserData(), hasKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); assertThat( - Long.parseLong(stats1.getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); engine.flush(true, true); @@ -572,9 +571,9 @@ public void testCommitStats() { assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); assertThat(Long.parseLong(stats2.getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(0L)); - assertThat(stats2.getUserData(), hasKey(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)); + assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); assertThat( - Long.parseLong(stats2.getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); } @@ -1676,14 +1675,14 @@ public void testSeqNoAndCheckpoints() throws IOException { Long.parseLong(initialEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); initialReplicaEngine.flush(true, true); assertThat( Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); assertThat( - Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); } finally { IOUtils.close(initialEngine, initialReplicaEngine); @@ -1714,7 +1713,7 @@ public void testSeqNoAndCheckpoints() throws IOException { Long.parseLong(recoveringEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); assertThat(recoveringEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); @@ -1724,7 +1723,7 @@ public void testSeqNoAndCheckpoints() throws IOException { Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); assertThat( - Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(GlobalCheckpointService.GLOBAL_CHECKPOINT_KEY)), + Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); assertThat(recoveringReplicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(replicaLocalCheckpoint)); assertThat( From c851db6c6c3bf3b3d12130f1349c7020abf2d775 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 07:48:19 -0400 Subject: [PATCH 04/12] Move local checkpoint commit key This commit moves the local checkpoint commit key from the local checkpoint service to the internal engine where it is better placed for clarity. This commit also enables marking this field as having default access. --- .../index/engine/InternalEngine.java | 8 ++++---- .../index/seqno/LocalCheckpointService.java | 3 --- .../index/engine/InternalEngineTests.java | 15 +++++++-------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 47da0ab9da0cd..2b6e6992a4862 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -58,7 +58,6 @@ import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; -import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -115,6 +114,7 @@ public class InternalEngine extends Engine { private final IndexThrottle throttle; private final SequenceNumbersService seqNoService; + final static String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; final static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges @@ -305,8 +305,8 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) private long loadLocalCheckpointFromCommit(IndexWriter writer) { final Map commitUserData = writer.getCommitData(); - if (commitUserData.containsKey(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)) { - return Long.parseLong(commitUserData.get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)); + if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) { + return Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); } else { return SequenceNumbersService.NO_OPS_PERFORMED; } @@ -1182,7 +1182,7 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration)); commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); - commitData.put(LocalCheckpointService.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint())); + commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint())); commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint())); if (syncId != null) { diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index d47c1dbd38c68..5203ad64b8846 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -33,9 +33,6 @@ */ public class LocalCheckpointService extends AbstractIndexShardComponent { - public static String MAX_SEQ_NO = "max_seq_no"; - public static String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; - /** * we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays * allocating them on demand and cleaning up while completed. This setting controls the size of the arrays diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a8cf4a802ff41..4805e50846f16 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -82,7 +82,6 @@ import org.elasticsearch.index.mapper.internal.SourceFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.object.RootObjectMapper; -import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -552,9 +551,9 @@ public void testCommitStats() { assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(stats1.getUserData(), hasKey(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.LOCAL_CHECKPOINT_KEY)); assertThat( - Long.parseLong(stats1.getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); assertThat( @@ -570,7 +569,7 @@ public void testCommitStats() { assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); - assertThat(Long.parseLong(stats2.getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), equalTo(0L)); + assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(0L)); assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); assertThat( Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), @@ -1672,14 +1671,14 @@ public void testSeqNoAndCheckpoints() throws IOException { initialEngine.flush(true, true); assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); initialReplicaEngine.flush(true, true); assertThat( - Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); assertThat( Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), @@ -1710,7 +1709,7 @@ public void testSeqNoAndCheckpoints() throws IOException { } assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), @@ -1720,7 +1719,7 @@ public void testSeqNoAndCheckpoints() throws IOException { assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primaryMaxSeqNo)); assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primaryMaxSeqNo + 1)); assertThat( - Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(LocalCheckpointService.LOCAL_CHECKPOINT_KEY)), + Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(replicaLocalCheckpoint)); assertThat( Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), From a9d5f5e079c21cb541c39a000e9bfe2a85b2f769 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 07:50:02 -0400 Subject: [PATCH 05/12] Remove superfluous global checkpoint constructor This commit removes a constructor from the global checkpoint service that was only used in tests. --- .../elasticsearch/index/seqno/GlobalCheckpointService.java | 4 ---- .../org/elasticsearch/index/seqno/GlobalCheckpointTests.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index 9fbee3ba90e20..4506a37ecbaaa 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -69,10 +69,6 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent { private long globalCheckpoint; - public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) { - this(shardId, indexSettings, SequenceNumbersService.UNASSIGNED_SEQ_NO); - } - public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java index 557f0ca199602..62d833c99258b 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTests.java @@ -41,7 +41,7 @@ public class GlobalCheckpointTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); checkpointService = new GlobalCheckpointService(new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY)); + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), SequenceNumbersService.UNASSIGNED_SEQ_NO); } public void testEmptyShards() { From a5b99c65259b933c4b379e29e160685ee427eda9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 07:53:33 -0400 Subject: [PATCH 06/12] Remove superfluous local checkpoint constructor This commit removes a constructor from the local checkpoint service that was only used in tests. --- .../index/seqno/LocalCheckpointService.java | 4 ---- .../index/seqno/LocalCheckpointServiceTests.java | 13 +++++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 5203ad64b8846..6b48917b20e99 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -54,10 +54,6 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { /** the next available seqNo - used for seqNo generation */ volatile long nextSeqNo; - public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings) { - this(shardId, indexSettings, SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.NO_OPS_PERFORMED); - } - public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) { super(shardId, indexSettings); bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java index e040d81393495..1456ed6aca264 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -53,12 +53,13 @@ public void setUp() throws Exception { private LocalCheckpointService getCheckpointService() { return new LocalCheckpointService( - new ShardId("test", "_na_", 0), - IndexSettingsModule.newIndexSettings("test", - Settings.builder() - .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) - .build() - )); + new ShardId("test", "_na_", 0), + IndexSettingsModule.newIndexSettings("test", + Settings.builder() + .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) + .build()), + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED); } public void testSimplePrimary() { From 6e8f11ae3da78c4b82cd267c1daba3951b462eee Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 08:00:44 -0400 Subject: [PATCH 07/12] Reduce visibility in sequence number services This commit reduces the visibility of certain fields and methods in the local and global checkpoint services. --- .../index/seqno/GlobalCheckpointService.java | 6 +++--- .../index/seqno/LocalCheckpointService.java | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index 4506a37ecbaaa..c6678a2dd5b89 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -69,7 +69,7 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent { private long globalCheckpoint; - public GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { + GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas()); @@ -124,7 +124,7 @@ private boolean updateLocalCheckpointInMap(String allocationId, long localCheckp * @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints * of one of the active allocations is not known. */ - synchronized public boolean updateCheckpointOnPrimary() { + synchronized boolean updateCheckpointOnPrimary() { long minCheckpoint = Long.MAX_VALUE; if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) { return false; @@ -164,7 +164,7 @@ synchronized public long getCheckpoint() { /** * updates the global checkpoint on a replica shard (after it has been updated by the primary). */ - synchronized public void updateCheckpointOnReplica(long globalCheckpoint) { + synchronized void updateCheckpointOnReplica(long globalCheckpoint) { if (this.globalCheckpoint <= globalCheckpoint) { this.globalCheckpoint = globalCheckpoint; logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 6b48917b20e99..36850f0b32cc1 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -45,16 +45,16 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { * which marks the seqNo the fist bit in the first array corresponds to. */ final LinkedList processedSeqNo; - final int bitArraysSize; + private final int bitArraysSize; long firstProcessedSeqNo; /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ volatile long checkpoint; /** the next available seqNo - used for seqNo generation */ - volatile long nextSeqNo; + private volatile long nextSeqNo; - public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) { + LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) { super(shardId, indexSettings); bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); processedSeqNo = new LinkedList<>(); @@ -66,14 +66,14 @@ public LocalCheckpointService(final ShardId shardId, final IndexSettings indexSe /** * issue the next sequence number **/ - public synchronized long generateSeqNo() { + synchronized long generateSeqNo() { return nextSeqNo++; } /** * marks the processing of the given seqNo have been completed **/ - public synchronized void markSeqNoAsCompleted(long seqNo) { + synchronized void markSeqNoAsCompleted(long seqNo) { // make sure we track highest seen seqNo if (seqNo >= nextSeqNo) { nextSeqNo = seqNo + 1; @@ -96,7 +96,7 @@ public long getCheckpoint() { } /** gets the maximum seqno seen so far */ - public long getMaxSeqNo() { + long getMaxSeqNo() { return nextSeqNo - 1; } From 5032fa75d2ac29f0c68e031142fb7830b4b6fda1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 08:07:21 -0400 Subject: [PATCH 08/12] Simplify loading sequence number stats from commit This commit simplifies the loading of sequence number stats from a commit point by combining all the logic for loading into a single method that returns an instance of SeqNoStats rather than three separate methods that return longs. --- .../index/engine/InternalEngine.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2b6e6992a4862..7d4fe02eb5543 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -58,6 +58,7 @@ import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -80,6 +81,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED; + /** * */ @@ -142,17 +145,21 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); - final long localCheckpoint = loadLocalCheckpointFromCommit(writer); - final long globalCheckpoint = loadGlobalCheckpointFromCommit(writer); - final long maxSeqNo = loadMaxSeqNoFromCommit(writer); + final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer); if (logger.isTraceEnabled()) { logger.trace( - "recovering local checkpoint: [{}], global checkpoint [{}], max sequence number [{}]", - localCheckpoint, - globalCheckpoint, - maxSeqNo); + "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); } - seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings(), maxSeqNo, localCheckpoint, globalCheckpoint); + seqNoService = + new SequenceNumbersService( + shardId, + engineConfig.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); indexWriter = writer; translog = openTranslog(engineConfig, writer); assert translog.getGeneration() != null; @@ -303,33 +310,34 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) return null; } - private long loadLocalCheckpointFromCommit(IndexWriter writer) { + private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { + final long maxSeqNo; + try (IndexReader reader = DirectoryReader.open(writer)) { + final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); + if (stats != null) { + maxSeqNo = (long) stats.getMaxValue(); + } else { + maxSeqNo = NO_OPS_PERFORMED; + } + } + final Map commitUserData = writer.getCommitData(); + + final long localCheckpoint; if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) { - return Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); + localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); } else { - return SequenceNumbersService.NO_OPS_PERFORMED; + localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; } - } - private long loadGlobalCheckpointFromCommit(IndexWriter writer) { - final Map commitUserData = writer.getCommitData(); + final long globalCheckpoint; if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) { - return Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); + globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); } else { - return SequenceNumbersService.UNASSIGNED_SEQ_NO; + globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; } - } - private long loadMaxSeqNoFromCommit(IndexWriter writer) throws IOException { - try (IndexReader reader = DirectoryReader.open(writer)) { - final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); - if (stats != null) { - return (long) stats.getMaxValue(); - } else { - return SequenceNumbersService.NO_OPS_PERFORMED; - } - } + return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); } private SearcherManager createSearcherManager() throws EngineException { From a40112f06899d4bb8e3a09a6ef2d94cffed3bb3b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 09:26:24 -0400 Subject: [PATCH 09/12] Fix seq. no. field stats when all docs are deleted This commit fixes an issue with the calculation of sequence number field stats when all docs are deleted. Namely, when all docs are deleted we would return a FieldStats instance when the intention is to return null. --- .../elasticsearch/index/mapper/internal/SeqNoFieldMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java index f1b11956f40bc..479e5f23527c4 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java @@ -130,10 +130,10 @@ public FieldStats stats(IndexReader reader) throws IOException { final LeafReader leaf = leaves.get(i).reader(); final NumericDocValues values = leaf.getNumericDocValues(name()); if (values == null) continue; - found = true; final Bits bits = leaf.getLiveDocs(); for (int docID = 0; docID < leaf.maxDoc(); docID++) { if (bits == null || bits.get(docID)) { + found = true; final long value = values.get(docID); currentMin = Math.min(currentMin, value); currentMax = Math.max(currentMax, value); From 3eb20efbac0b09c4aa301ce79f459ff73c1b149a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Jun 2016 10:32:04 -0400 Subject: [PATCH 10/12] Clarify local and global checkpoint service init This commit clarifies local and global checkpoint service initialization by adding Javadocs and rewriting some of the field initialization to clarify the initial values. --- .../index/seqno/GlobalCheckpointService.java | 12 +++++++ .../index/seqno/LocalCheckpointService.java | 32 ++++++++++++++++--- .../index/seqno/SequenceNumbersService.java | 27 ++++++++++++++++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java index c6678a2dd5b89..65f9afe78f75e 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java @@ -69,6 +69,18 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent { private long globalCheckpoint; + /** + * Initialize the global checkpoint service. The {@code globalCheckpoint} + * should be set to the last known global checkpoint for this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED}. + * + * @param shardId the shard this service is providing tracking + * local checkpoints for + * @param indexSettings the index settings + * @param globalCheckpoint the last known global checkpoint for this shard, + * or + * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + */ GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) { super(shardId, indexSettings); activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas()); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java index 36850f0b32cc1..32beb5cb16386 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -54,13 +54,37 @@ public class LocalCheckpointService extends AbstractIndexShardComponent { /** the next available seqNo - used for seqNo generation */ private volatile long nextSeqNo; - LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long checkpoint) { + /** + * Initialize the local checkpoint service. The {@code maxSeqNo} should be + * set to the last sequence number assigned by this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} and + * {@code localCheckpoint} should be set to the last known local checkpoint + * for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}. + * + * @param shardId the shard this service is providing tracking + * local checkpoints for + * @param indexSettings the index settings + * @param maxSeqNo the last sequence number assigned by this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param localCheckpoint the last known local checkpoint for this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} + */ + LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) { super(shardId, indexSettings); + if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) { + throw new IllegalArgumentException( + "local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] " + + "but was [" + localCheckpoint + "]"); + } + if (maxSeqNo < 0 && maxSeqNo != SequenceNumbersService.NO_OPS_PERFORMED) { + throw new IllegalArgumentException( + "max seq. no. must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); + } bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); processedSeqNo = new LinkedList<>(); - firstProcessedSeqNo = checkpoint + 1; - this.nextSeqNo = maxSeqNo + 1; - this.checkpoint = checkpoint; + firstProcessedSeqNo = localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; + this.nextSeqNo = maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; + this.checkpoint = localCheckpoint; } /** diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index c4cdb8326317b..b745194f17c4f 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -30,10 +30,37 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { public final static long UNASSIGNED_SEQ_NO = -2L; + + /** + * Represents no operations have been performed on the shard. + */ public final static long NO_OPS_PERFORMED = -1L; + final LocalCheckpointService localCheckpointService; final GlobalCheckpointService globalCheckpointService; + /** + * Initialize the sequence number service. The {@code maxSeqNo} + * should be set to the last sequence number assigned by this + * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, + * {@code localCheckpoint} should be set to the last known local + * checkpoint for this shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED}, and + * {@code globalCheckpoint} should be set to the last known global + * checkpoint for this shard, or + * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. + * + * @param shardId the shard this service is providing tracking + * local checkpoints for + * @param indexSettings the index settings + * @param maxSeqNo the last sequence number assigned by this + * shard, or + * {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param localCheckpoint the last known local checkpoint for this shard, + * or {@link SequenceNumbersService#NO_OPS_PERFORMED} + * @param globalCheckpoint the last known global checkpoint for this shard, + * or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + */ public SequenceNumbersService( final ShardId shardId, final IndexSettings indexSettings, From 331fceb602764127fa192f04a4a4b923b79cc7cd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 Jun 2016 08:14:20 -0400 Subject: [PATCH 11/12] Simplify internal engine seq. no. checkpoint test This commit simplifies an internal engine test for sequence number checkpoint persistence. For now, we will ignore the issues surrounding delete which will be addressed in follow-up work. We also remove the complexity of the interaction between a primary shard and a replica (although we simulate advancing the local checkpoint on a replica and the corresponding advancement of the global checkpoint). Finally, we add a translog recovery component to the test. --- .../index/engine/InternalEngineTests.java | 182 +++++------------- 1 file changed, 50 insertions(+), 132 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4805e50846f16..c6ca39a5a629d 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -115,7 +115,6 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -140,6 +139,7 @@ public class InternalEngineTests extends ESTestCase { + protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 1); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); @@ -313,9 +313,10 @@ public void onFailedEngine(String reason, @Nullable Throwable t) { return config; } - protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); - protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); - protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); + private static final BytesReference B_1 = new BytesArray(new byte[]{1}); + private static final BytesReference B_2 = new BytesArray(new byte[]{2}); + private static final BytesReference B_3 = new BytesArray(new byte[]{3}); + private static final BytesArray SOURCE = new BytesArray("{}".getBytes(Charset.defaultCharset())); public void testSegments() throws Exception { try (Store store = createStore(); @@ -625,7 +626,7 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { try { initialEngine = engine; for (int i = 0; i < ops; i++) { - final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); if (randomBoolean()) { final Engine.Index operation = new Engine.Index(newUid("test#1"), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()); operations.add(operation); @@ -661,7 +662,7 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { initialEngine = engine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); - final ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + final ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); initialEngine.index(new Engine.Index(newUid(id), doc)); } } finally { @@ -1575,163 +1576,80 @@ public void testIndexWriterInfoStream() { } public void testSeqNoAndCheckpoints() throws IOException { - int opCount = randomIntBetween(1, 256); + // nocommit: does not test deletes + final int opCount = randomIntBetween(1, 256); long primarySeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - long replicaSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - // we lose deletes when recovering final String[] ids = new String[]{"1", "2", "3"}; - final Map primaryMaxNoDeleteSeqNo = new HashMap<>(); - final Map replicaMaxNoDeleteSeqNo = new HashMap<>(); + long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; InternalEngine initialEngine = null; - InternalEngine initialReplicaEngine = null; + try { initialEngine = engine; - initialReplicaEngine = replicaEngine; - boolean broken = false; + initialEngine + .seqNoService() + .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); for (int op = 0; op < opCount; op++) { final String id = randomFrom(ids); - ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), B_1, null); - if (randomBoolean()) { - final Engine.Index index = new Engine.Index(newUid(id), doc, - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis()); - try { - initialEngine.index(index); - primarySeqNo++; - } catch (VersionConflictEngineException e) { - // OK - } - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - if (rarely()) { - // put a hole in the sequence numbers on the replica - if (!broken) { - broken = true; - replicaLocalCheckpoint = replicaSeqNo; - } - } else { - Engine.Index replica = new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), - index.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis()); - initialReplicaEngine.index(replica); - replicaSeqNo = primarySeqNo; - replicaMaxNoDeleteSeqNo.put(id, replicaSeqNo); - } - primaryMaxNoDeleteSeqNo.put(id, primarySeqNo); - } - } else { - final Engine.Delete delete = new Engine.Delete("test", id, newUid(id), - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis(), false); - try { - initialEngine.delete(delete); - primarySeqNo++; - } catch (VersionConflictEngineException e) { - // OK - } - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - if (rarely()) { - // put a hole in the sequence numbers on the replica - if (!broken) { - broken = true; - replicaLocalCheckpoint = replicaSeqNo; - } - } else { - Engine.Delete replica = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis(), false); - initialReplicaEngine.delete(replica); - replicaSeqNo = primarySeqNo; - replicaMaxNoDeleteSeqNo.remove(id); - } - primaryMaxNoDeleteSeqNo.remove(id); - } + ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); + final Engine.Index index = new Engine.Index(newUid("test#" + id), doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis()); + try { + initialEngine.index(index); + primarySeqNo++; + } catch (VersionConflictEngineException e) { + } - } - if (!broken) { - replicaLocalCheckpoint = primarySeqNo; + replicaLocalCheckpoint = + rarely() ? replicaLocalCheckpoint : randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo)); + initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint()); + initialEngine.seqNoService().updateLocalCheckpointForShard("replica", replicaLocalCheckpoint); + + if (rarely()) { + localCheckpoint = primarySeqNo; + globalCheckpoint = replicaLocalCheckpoint; + initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); + initialEngine.flush(true, true); + } } - initialEngine - .seqNoService() - .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); - initialEngine.seqNoService().updateLocalCheckpointForShard("primary", initialEngine.seqNoService().getLocalCheckpoint()); - initialEngine.seqNoService().updateLocalCheckpointForShard("replica", initialReplicaEngine.seqNoService().getLocalCheckpoint()); initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); - assertThat(initialReplicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(replicaSeqNo)); - assertThat(initialReplicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(replicaLocalCheckpoint)); - assertThat( - initialReplicaEngine.seqNoService().stats().getGlobalCheckpoint(), - equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - initialEngine.flush(true, true); - assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), - equalTo(primarySeqNo)); assertThat( - Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(replicaLocalCheckpoint)); - initialReplicaEngine.flush(true, true); + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), + equalTo(localCheckpoint)); assertThat( - Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), - equalTo(replicaLocalCheckpoint)); - assertThat( - Long.parseLong(initialReplicaEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + equalTo(globalCheckpoint)); + } finally { - IOUtils.close(initialEngine, initialReplicaEngine); + IOUtils.close(initialEngine); } InternalEngine recoveringEngine = null; - InternalEngine recoveringReplicaEngine = null; try { recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); - recoveringReplicaEngine = - new InternalEngine(copy(initialReplicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); - - long primaryMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - for (String id : ids) { - if (primaryMaxNoDeleteSeqNo.containsKey(id)) { - primaryMaxSeqNo = Math.max(primaryMaxSeqNo, primaryMaxNoDeleteSeqNo.get(id)); - } - } - - long replicaMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - for (String id : ids) { - if (replicaMaxNoDeleteSeqNo.containsKey(id)) { - replicaMaxSeqNo = Math.max(replicaMaxSeqNo, replicaMaxNoDeleteSeqNo.get(id)); - } - } + recoveringEngine.recoverFromTranslog(); assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), - equalTo(primarySeqNo)); + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), + equalTo(primarySeqNo)); assertThat( - Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(replicaLocalCheckpoint)); + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), + equalTo(globalCheckpoint)); assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); - assertThat(recoveringEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); - assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primaryMaxSeqNo)); - assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primaryMaxSeqNo + 1)); - assertThat( - Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), - equalTo(replicaLocalCheckpoint)); - assertThat( - Long.parseLong(recoveringReplicaEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - assertThat(recoveringReplicaEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(replicaLocalCheckpoint)); - assertThat( - recoveringReplicaEngine.seqNoService().stats().getGlobalCheckpoint(), - equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - assertThat(recoveringReplicaEngine.seqNoService().stats().getMaxSeqNo(), equalTo(replicaMaxSeqNo)); - assertThat(recoveringReplicaEngine.seqNoService().generateSeqNo(), equalTo(replicaMaxSeqNo + 1)); + assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1)); } finally { - IOUtils.close(recoveringEngine, recoveringReplicaEngine); + IOUtils.close(recoveringEngine); } } From f223f19e01c1d9395e9beb7df82b30d502edc70a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 Jun 2016 10:26:19 -0400 Subject: [PATCH 12/12] Test global checkpoint persistence in commit stats This commit adds a test for global checkpoint persistence to the commit stats internal engine test. Note that this test now randomizes the values of the local and global checkpoints. --- .../index/engine/InternalEngine.java | 4 +- .../index/engine/InternalEngineTests.java | 80 +++++++++++++------ 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7d4fe02eb5543..4830b373e3bf9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1190,8 +1190,8 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration)); commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); - commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService.getLocalCheckpoint())); - commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService.getGlobalCheckpoint())); + commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint())); + commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint())); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c6ca39a5a629d..634ddb61f0feb 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -124,6 +124,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; @@ -542,39 +543,66 @@ public void testSegmentsStatsIncludingFileSizes() throws Exception { } } - public void testCommitStats() { - Document document = testDocumentWithTextField(); - document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE)); - ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null); - engine.index(new Engine.Index(newUid("1"), doc)); + public void testCommitStats() throws IOException { + InternalEngine engine = null; + try { + this.engine.close(); - CommitStats stats1 = engine.commitStats(); - assertThat(stats1.getGeneration(), greaterThan(0L)); - assertThat(stats1.getId(), notNullValue()); - assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(stats1.getUserData(), hasKey(InternalEngine.LOCAL_CHECKPOINT_KEY)); - assertThat( + final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); + + engine = new InternalEngine(copy(this.engine.config(), this.engine.config().getOpenMode())) { + @Override + public SequenceNumbersService seqNoService() { + return new SequenceNumbersService( + this.config().getShardId(), + this.config().getIndexSettings(), + maxSeqNo.get(), + localCheckpoint.get(), + globalCheckpoint.get()); + } + }; + CommitStats stats1 = engine.commitStats(); + assertThat(stats1.getGeneration(), greaterThan(0L)); + assertThat(stats1.getId(), notNullValue()); + assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.LOCAL_CHECKPOINT_KEY)); + assertThat( Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); - assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); - assertThat( + assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); + assertThat( Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); - engine.flush(true, true); - CommitStats stats2 = engine.commitStats(); - assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); - assertThat(stats2.getId(), notNullValue()); - assertThat(stats2.getId(), not(equalTo(stats1.getId()))); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); - assertThat(stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); - assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(0L)); - assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); - assertThat( + maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024)); + localCheckpoint.set( + rarely() || maxSeqNo.get() == SequenceNumbersService.NO_OPS_PERFORMED ? + SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024)); + globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbersService.NO_OPS_PERFORMED ? + SequenceNumbersService.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get())); + + engine.flush(true, true); + + CommitStats stats2 = engine.commitStats(); + assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); + assertThat(stats2.getId(), notNullValue()); + assertThat(stats2.getId(), not(equalTo(stats1.getId()))); + assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); + assertThat( + stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), + not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); + assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); + assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY)); + assertThat( Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), - equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + equalTo(globalCheckpoint.get())); + } finally { + IOUtils.close(engine); + } } public void testIndexSearcherWrapper() throws Exception {