diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 3fba14e72bc79..91e1f82658739 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.admin.cluster.stats; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -30,6 +29,7 @@ import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; @@ -105,7 +105,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq for (IndexShard indexShard : indexService) { if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) { // only report on fully started shards - shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats())); + shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), + new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats(), indexShard.seqNoStats())); } } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java index fb306337886af..037bf8575eeea 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java @@ -227,7 +227,6 @@ public static enum Flag { RequestCache("request_cache"), Recovery("recovery"); - private final String restName; Flag(String restName) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 8fea8c795ebd5..81586f0fa7cae 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.engine.CommitStats; -import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; @@ -42,6 +42,8 @@ public class ShardStats implements Streamable, ToXContent { private CommonStats commonStats; @Nullable private CommitStats commitStats; + @Nullable + private SeqNoStats seqNoStats; private String dataPath; private String statePath; private boolean isCustomDataPath; @@ -49,13 +51,14 @@ public class ShardStats implements Streamable, ToXContent { ShardStats() { } - public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) { + public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) { this.shardRouting = routing; this.dataPath = shardPath.getRootDataPath().toString(); this.statePath = shardPath.getRootStatePath().toString(); this.isCustomDataPath = shardPath.isCustomDataPath(); this.commitStats = commitStats; this.commonStats = commonStats; + this.seqNoStats = seqNoStats; } /** @@ -73,6 +76,11 @@ public CommitStats getCommitStats() { return this.commitStats; } + @Nullable + public SeqNoStats getSeqNoStats() { + return this.seqNoStats; + } + public String getDataPath() { return dataPath; } @@ -99,6 +107,7 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); + seqNoStats = in.readOptionalStreamableReader(SeqNoStats::new); } @Override @@ -109,6 +118,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); + out.writeOptionalWritable(seqNoStats); } @Override @@ -124,6 +134,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (commitStats != null) { commitStats.toXContent(builder, params); } + if (seqNoStats != null) { + seqNoStats.toXContent(builder, params); + } builder.startObject(Fields.SHARD_PATH); builder.field(Fields.STATE_PATH, statePath); builder.field(Fields.DATA_PATH, dataPath); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index d5de67da478bb..e5fd4d4120879 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -162,6 +162,7 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh flags.set(CommonStatsFlags.Flag.Recovery); } - return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()); + return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), + new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats()); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java index 00904af89155b..c1e159b2185d7 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; - import org.apache.lucene.analysis.Analyzer; import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -30,6 +29,7 @@ import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.seqno.LocalCheckpointService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.indices.mapper.MapperRegistry; @@ -93,8 +93,8 @@ private boolean isUpgraded(IndexMetaData indexMetaData) { private void checkSupportedVersion(IndexMetaData indexMetaData) { if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) { throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1 and wasn't upgraded." - + " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion() - + " and upgraded using the upgrade API."); + + " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion() + + " and upgraded using the upgrade API."); } } @@ -107,7 +107,7 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { return true; } if (indexMetaData.getMinimumCompatibleVersion() != null && - indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_5_0_0)) { + indexMetaData.getMinimumCompatibleVersion().onOrAfter(org.apache.lucene.util.Version.LUCENE_5_0_0)) { //The index was upgraded we can work with it return true; } @@ -116,42 +116,43 @@ private static boolean isSupportedVersion(IndexMetaData indexMetaData) { /** All known byte-sized settings for an index. */ public static final Set INDEX_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( - "index.merge.policy.floor_segment", - "index.merge.policy.max_merged_segment", - "index.merge.policy.max_merge_size", - "index.merge.policy.min_merge_size", - "index.shard.recovery.file_chunk_size", - "index.shard.recovery.translog_size", - "index.store.throttle.max_bytes_per_sec", - "index.translog.flush_threshold_size", - "index.translog.fs.buffer_size", - "index.version_map_size")); + "index.merge.policy.floor_segment", + "index.merge.policy.max_merged_segment", + "index.merge.policy.max_merge_size", + "index.merge.policy.min_merge_size", + "index.shard.recovery.file_chunk_size", + "index.shard.recovery.translog_size", + "index.store.throttle.max_bytes_per_sec", + "index.translog.flush_threshold_size", + "index.translog.fs.buffer_size", + "index.version_map_size")); /** All known time settings for an index. */ public static final Set INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet( - "index.gateway.wait_for_mapping_update_post_recovery", - "index.shard.wait_for_mapping_update_post_recovery", - "index.gc_deletes", - "index.indexing.slowlog.threshold.index.debug", - "index.indexing.slowlog.threshold.index.info", - "index.indexing.slowlog.threshold.index.trace", - "index.indexing.slowlog.threshold.index.warn", - "index.refresh_interval", - "index.search.slowlog.threshold.fetch.debug", - "index.search.slowlog.threshold.fetch.info", - "index.search.slowlog.threshold.fetch.trace", - "index.search.slowlog.threshold.fetch.warn", - "index.search.slowlog.threshold.query.debug", - "index.search.slowlog.threshold.query.info", - "index.search.slowlog.threshold.query.trace", - "index.search.slowlog.threshold.query.warn", - "index.shadow.wait_for_initial_commit", - "index.store.stats_refresh_interval", - "index.translog.flush_threshold_period", - "index.translog.interval", - "index.translog.sync_interval", - "index.shard.inactive_time", - UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); + "index.gateway.wait_for_mapping_update_post_recovery", + "index.shard.wait_for_mapping_update_post_recovery", + "index.gc_deletes", + "index.indexing.slowlog.threshold.index.debug", + "index.indexing.slowlog.threshold.index.info", + "index.indexing.slowlog.threshold.index.trace", + "index.indexing.slowlog.threshold.index.warn", + "index.refresh_interval", + "index.search.slowlog.threshold.fetch.debug", + "index.search.slowlog.threshold.fetch.info", + "index.search.slowlog.threshold.fetch.trace", + "index.search.slowlog.threshold.fetch.warn", + "index.search.slowlog.threshold.query.debug", + "index.search.slowlog.threshold.query.info", + "index.search.slowlog.threshold.query.trace", + "index.search.slowlog.threshold.query.warn", + "index.shadow.wait_for_initial_commit", + "index.store.stats_refresh_interval", + "index.translog.flush_threshold_period", + "index.translog.interval", + "index.translog.sync_interval", + "index.shard.inactive_time", + LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, + UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING)); /** * Elasticsearch 2.0 requires units on byte/memory and time settings; this method adds the default unit to any such settings that are @@ -163,7 +164,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) { // Created lazily if we find any settings that are missing units: Settings settings = indexMetaData.getSettings(); Settings.Builder newSettings = null; - for(String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) { + for (String byteSizeSetting : INDEX_BYTES_SIZE_SETTINGS) { String value = settings.get(byteSizeSetting); if (value != null) { try { @@ -180,7 +181,7 @@ private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) { newSettings.put(byteSizeSetting, value + "b"); } } - for(String timeSetting : INDEX_TIME_SETTINGS) { + for (String timeSetting : INDEX_TIME_SETTINGS) { String value = settings.get(timeSetting); if (value != null) { try { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 20859e2716a8d..1fe06600cdefa 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -143,7 +143,7 @@ public short readShort() throws IOException { */ public int readInt() throws IOException { return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16) - | ((readByte() & 0xFF) << 8) | (readByte() & 0xFF); + | ((readByte() & 0xFF) << 8) | (readByte() & 0xFF); } /** @@ -543,6 +543,17 @@ public T readOptionalStreamable(Supplier supplier) thr } } + /** + * Serializes a potential null value. + */ + public T readOptionalStreamableReader(StreamableReader streamableReader) throws IOException { + if (readBoolean()) { + return streamableReader.readFrom(this); + } else { + return null; + } + } + public T readThrowable() throws IOException { if (readBoolean()) { int key = readVInt(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 5f1e7623d2822..d5e96d9b0dbdb 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -503,6 +503,18 @@ public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOEx } } + /** + * Serializes a potential null value. + */ + public void writeOptionalWritable(@Nullable Writeable writeable) throws IOException { + if (writeable != null) { + writeBoolean(true); + writeable.writeTo(this); + } else { + writeBoolean(false); + } + } + public void writeThrowable(Throwable throwable) throws IOException { if (throwable == null) { writeBoolean(false); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java index d75b3ffa8c264..8033750d1d24e 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java @@ -31,13 +31,17 @@ public class EsRejectedExecutionException extends ElasticsearchException { private final boolean isExecutorShutdown; - public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { - super(message); + public EsRejectedExecutionException(String message, boolean isExecutorShutdown, Object... args) { + super(message, args); this.isExecutorShutdown = isExecutorShutdown; } - public EsRejectedExecutionException(String message) { - this(message, false); + public EsRejectedExecutionException(String message, Object... args) { + this(message, false, args); + } + + public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { + this(message, isExecutorShutdown, new Object[0]); } public EsRejectedExecutionException() { diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index bccae2e46642a..2bd714c2d26b3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -45,6 +45,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -325,6 +326,9 @@ public CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); } + /** get sequence number related stats */ + public abstract SeqNoStats seqNoStats(); + /** * Read the last segments info from the commit pointed to by the searcher manager */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dbb62a735a064..488222487630f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.MergeSchedulerConfig; @@ -348,10 +349,6 @@ public boolean index(Index index) { } catch (OutOfMemoryError | IllegalStateException | IOException t) { maybeFailEngine("index", t); throw new IndexFailedEngineException(shardId, index.type(), index.id(), t); - } finally { - if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService.markSeqNoAsCompleted(index.seqNo()); - } } checkVersionMapRefresh(); return created; @@ -359,66 +356,71 @@ public boolean index(Index index) { private boolean innerIndex(Index index) throws IOException { synchronized (dirtyLock(index.uid())) { - lastWriteNanos = index.startTime(); - final long currentVersion; - final boolean deleted; - VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(index.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - deleted = versionValue.delete(); - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + try { + lastWriteNanos = index.startTime(); + final long currentVersion; + final boolean deleted; + VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(index.uid()); + deleted = currentVersion == Versions.NOT_FOUND; } else { - currentVersion = versionValue.version(); + deleted = versionValue.delete(); + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } - } - long expectedVersion = index.version(); - if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (index.origin() == Operation.Origin.RECOVERY) { - return false; - } else { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), - index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + long expectedVersion = index.version(); + if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { + if (index.origin() == Operation.Origin.RECOVERY) { + return false; + } else { + throw new VersionConflictEngineException(shardId, index.type(), index.id(), + index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + } } - } - long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - - final boolean created; - index.updateVersion(updatedVersion); - if (index.origin() == Operation.Origin.PRIMARY) { - index.updateSeqNo(seqNoService.generateSeqNo()); - } + long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - if (currentVersion == Versions.NOT_FOUND) { - // document does not exists, we can optimize for create - created = true; - if (index.docs().size() > 1) { - indexWriter.addDocuments(index.docs()); - } else { - indexWriter.addDocument(index.docs().get(0)); + final boolean created; + index.updateVersion(updatedVersion); + if (index.origin() == Operation.Origin.PRIMARY) { + index.updateSeqNo(seqNoService.generateSeqNo()); } - } else { - if (versionValue != null) { - created = versionValue.delete(); // we have a delete which is not GC'ed... - } else { - created = false; - } - if (index.docs().size() > 1) { - indexWriter.updateDocuments(index.uid(), index.docs()); + if (currentVersion == Versions.NOT_FOUND) { + // document does not exists, we can optimize for create + created = true; + if (index.docs().size() > 1) { + indexWriter.addDocuments(index.docs()); + } else { + indexWriter.addDocument(index.docs().get(0)); + } } else { - indexWriter.updateDocument(index.uid(), index.docs().get(0)); + if (versionValue != null) { + created = versionValue.delete(); // we have a delete which is not GC'ed... + } else { + created = false; + } + if (index.docs().size() > 1) { + indexWriter.updateDocuments(index.uid(), index.docs()); + } else { + indexWriter.updateDocument(index.uid(), index.docs().get(0)); + } } - } - Translog.Location translogLocation = translog.add(new Translog.Index(index)); + Translog.Location translogLocation = translog.add(new Translog.Index(index)); - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); - index.setTranslogLocation(translogLocation); + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); + index.setTranslogLocation(translogLocation); - indexingService.postIndexUnderLock(index); - return created; + indexingService.postIndexUnderLock(index); + return created; + } finally { + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService.markSeqNoAsCompleted(index.seqNo()); + } + } } } @@ -458,10 +460,6 @@ public void delete(Delete delete) throws EngineException { } catch (OutOfMemoryError | IllegalStateException | IOException t) { maybeFailEngine("delete", t); throw new DeleteFailedEngineException(shardId, delete, t); - } finally { - if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService.markSeqNoAsCompleted(delete.seqNo()); - } } maybePruneDeletedTombstones(); @@ -478,56 +476,62 @@ private void maybePruneDeletedTombstones() { private void innerDelete(Delete delete) throws IOException { synchronized (dirtyLock(delete.uid())) { - lastWriteNanos = delete.startTime(); - final long currentVersion; - final boolean deleted; - VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes()); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(delete.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - deleted = versionValue.delete(); - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + try { + lastWriteNanos = delete.startTime(); + final long currentVersion; + final boolean deleted; + VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes()); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(delete.uid()); + deleted = currentVersion == Versions.NOT_FOUND; } else { - currentVersion = versionValue.version(); + deleted = versionValue.delete(); + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } - } - long updatedVersion; - long expectedVersion = delete.version(); - if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (delete.origin() == Operation.Origin.RECOVERY) { - return; - } else { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), - delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + long updatedVersion; + long expectedVersion = delete.version(); + if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { + if (delete.origin() == Operation.Origin.RECOVERY) { + return; + } else { + throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), + delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + } } - } - updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); + updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); - if (delete.origin() == Operation.Origin.PRIMARY) { - delete.updateSeqNo(seqNoService.generateSeqNo()); - } + if (delete.origin() == Operation.Origin.PRIMARY) { + delete.updateSeqNo(seqNoService.generateSeqNo()); + } - final boolean found; - if (currentVersion == Versions.NOT_FOUND) { - // doc does not exist and no prior deletes - found = false; - } else if (versionValue != null && versionValue.delete()) { - // a "delete on delete", in this case, we still increment the version, log it, and return that version - found = false; - } else { - // we deleted a currently existing document - indexWriter.deleteDocuments(delete.uid()); - found = true; - } + final boolean found; + if (currentVersion == Versions.NOT_FOUND) { + // doc does not exist and no prior deletes + found = false; + } else if (versionValue != null && versionValue.delete()) { + // a "delete on delete", in this case, we still increment the version, log it, and return that version + found = false; + } else { + // we deleted a currently existing document + indexWriter.deleteDocuments(delete.uid()); + found = true; + } - delete.updateVersion(updatedVersion, found); - Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); - versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation)); - delete.setTranslogLocation(translogLocation); - indexingService.postDeleteUnderLock(delete); + delete.updateVersion(updatedVersion, found); + Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); + versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation)); + delete.setTranslogLocation(translogLocation); + indexingService.postDeleteUnderLock(delete); + } finally { + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService.markSeqNoAsCompleted(delete.seqNo()); + } + } } } @@ -988,7 +992,7 @@ final static class SearchFactory extends EngineSearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { IndexSearcher searcher = super.newSearcher(reader, previousReader); - if (reader instanceof LeafReader && isMergedSegment((LeafReader)reader)) { + if (reader instanceof LeafReader && isMergedSegment((LeafReader) reader)) { // we call newSearcher from the IndexReaderWarmer which warms segments during merging // in that case the reader is a LeafReader and all we need to do is to build a new Searcher // and return it since it does it's own warming for that particular reader. @@ -1178,4 +1182,9 @@ public void onSettingsChanged() { public MergeStats getMergeStats() { return mergeScheduler.stats(); } + + @Override + public SeqNoStats seqNoStats() { + return seqNoService.stats(); + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index af3e0ae82a8ec..dd9ff4375e854 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.translog.Translog; import java.io.IOException; @@ -231,4 +232,9 @@ public long indexWriterRAMBytesUsed() { // No IndexWriter throw new UnsupportedOperationException("ShadowEngine has no IndexWriter"); } + + @Override + public SeqNoStats seqNoStats() { + throw new UnsupportedOperationException("ShadowEngine doesn't track sequence numbers"); + } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java new file mode 100644 index 0000000000000..8b5cb4a4616a5 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; + +import java.util.LinkedList; + +/** + * This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which + * all previous seqNo have been processed (including) + */ +public class LocalCheckpointService extends AbstractIndexShardComponent { + + /** + * we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays + * allocating them on demand and cleaning up while completed. This setting controls the size of the arrays + */ + public static String SETTINGS_BIT_ARRAYS_SIZE = "index.seq_no.checkpoint.bit_arrays_size"; + + /** default value for {@link #SETTINGS_BIT_ARRAYS_SIZE} */ + final static int DEFAULT_BIT_ARRAYS_SIZE = 1024; + + + /** + * an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo} + * which marks the seqNo the fist bit in the first array corresponds to. + */ + final LinkedList processedSeqNo; + final int bitArraysSize; + long firstProcessedSeqNo = 0; + + /** the current local checkpoint, i.e., all seqNo lower (<=) than this number have been completed */ + volatile long checkpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + + /** the next available seqNo - used for seqNo generation */ + volatile long nextSeqNo = 0; + + + public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) { + super(shardId, indexSettings); + bitArraysSize = indexSettings.getSettings().getAsInt(SETTINGS_BIT_ARRAYS_SIZE, DEFAULT_BIT_ARRAYS_SIZE); + if (bitArraysSize <= 0) { + throw new IllegalArgumentException("[" + SETTINGS_BIT_ARRAYS_SIZE + "] must be positive. got [" + bitArraysSize + "]"); + } + processedSeqNo = new LinkedList<>(); + } + + /** + * issue the next sequence number + **/ + public synchronized long generateSeqNo() { + return nextSeqNo++; + } + + /** + * marks the processing of the given seqNo have been completed + **/ + public synchronized void markSeqNoAsCompleted(long seqNo) { + // make sure we track highest seen seqNo + if (seqNo >= nextSeqNo) { + nextSeqNo = seqNo + 1; + } + if (seqNo <= checkpoint) { + // this is possible during recovery where we might replay an op that was also replicated + return; + } + FixedBitSet bitSet = getBitSetForSeqNo(seqNo); + int offset = seqNoToBitSetOffset(seqNo); + bitSet.set(offset); + if (seqNo == checkpoint + 1) { + updateCheckpoint(); + } + } + + /** gets the current check point */ + public long getCheckpoint() { + return checkpoint; + } + + /** gets the maximum seqno seen so far */ + public long getMaxSeqNo() { + return nextSeqNo - 1; + } + + /** + * moves the checkpoint to the last consecutively processed seqNo + * Note: this method assumes that the seqNo following the current checkpoint is processed. + */ + private void updateCheckpoint() { + assert Thread.holdsLock(this); + assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 : + "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; + assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : + "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; + assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : + "updateCheckpoint is called but the bit following the checkpoint is not set"; + // keep it simple for now, get the checkpoint one by one. in the future we can optimize and read words + FixedBitSet current = processedSeqNo.getFirst(); + do { + checkpoint++; + // the checkpoint always falls in the first bit set or just before. If it falls + // on the last bit of the current bit set, we can clean it. + if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) { + processedSeqNo.removeFirst(); + firstProcessedSeqNo += bitArraysSize; + assert checkpoint - firstProcessedSeqNo < bitArraysSize; + current = processedSeqNo.peekFirst(); + } + } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); + } + + /** + * gets the bit array for the given seqNo, allocating new ones if needed. + */ + private FixedBitSet getBitSetForSeqNo(long seqNo) { + assert Thread.holdsLock(this); + assert seqNo >= firstProcessedSeqNo; + int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize; + while (bitSetOffset >= processedSeqNo.size()) { + processedSeqNo.add(new FixedBitSet(bitArraysSize)); + } + return processedSeqNo.get(bitSetOffset); + } + + + /** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ + private int seqNoToBitSetOffset(long seqNo) { + assert Thread.holdsLock(this); + assert seqNo >= firstProcessedSeqNo; + return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java new file mode 100644 index 0000000000000..99ffb6ad54794 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +public class SeqNoStats implements ToXContent, Writeable { + + final long maxSeqNo; + final long localCheckpoint; + + public SeqNoStats(long maxSeqNo, long localCheckpoint) { + this.maxSeqNo = maxSeqNo; + this.localCheckpoint = localCheckpoint; + } + + public SeqNoStats(StreamInput in) throws IOException { + this(in.readZLong(), in.readZLong()); + } + + /** the maximum sequence number seen so far */ + public long getMaxSeqNo() { + return maxSeqNo; + } + + /** the maximum sequence number for which all previous operations (including) have been completed */ + public long getLocalCheckpoint() { + return localCheckpoint; + } + + @Override + public SeqNoStats readFrom(StreamInput in) throws IOException { + return new SeqNoStats(in.readLong(), in.readLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(maxSeqNo); + out.writeZLong(localCheckpoint); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.SEQ_NO); + builder.field(Fields.MAX_SEQ_NO, maxSeqNo); + builder.field(Fields.LOCAL_CHECKPOINT, localCheckpoint); + builder.endObject(); + return builder; + } + + + static final class Fields { + static final XContentBuilderString SEQ_NO = new XContentBuilderString("seq_no"); + static final XContentBuilderString MAX_SEQ_NO = new XContentBuilderString("max"); + static final XContentBuilderString LOCAL_CHECKPOINT = new XContentBuilderString("local_checkpoint"); + } +} diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 46b033622432b..3ef8607c4c230 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -22,39 +22,38 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import java.util.concurrent.atomic.AtomicLong; - /** a very light weight implementation. will be replaced with proper machinery later */ public class SequenceNumbersService extends AbstractIndexShardComponent { public final static long UNASSIGNED_SEQ_NO = -1L; - - AtomicLong seqNoGenerator = new AtomicLong(); + final LocalCheckpointService localCheckpointService; public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); + localCheckpointService = new LocalCheckpointService(shardId, indexSettings); } /** * generates a new sequence number. * Note: you must call {@link #markSeqNoAsCompleted(long)} after the operation for which this seq# was generated - * was completed (whether successfully or with a failure + * was completed (whether successfully or with a failure) */ public long generateSeqNo() { - return seqNoGenerator.getAndIncrement(); + return localCheckpointService.generateSeqNo(); } + /** + * marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)} + * more details + */ public void markSeqNoAsCompleted(long seqNo) { - // this is temporary to make things semi sane on primary promotion and recovery. will be replaced with better machinery - boolean success; - do { - long maxSeqNo = seqNoGenerator.get(); - if (seqNo > maxSeqNo) { - success = seqNoGenerator.compareAndSet(maxSeqNo, seqNo); - } else { - success = true; - } - } while (success == false); + localCheckpointService.markSeqNoAsCompleted(seqNo); } + /** + * Gets sequence number related stats + */ + public SeqNoStats stats() { + return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint()); + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 443696986fd11..5bc38e5616e64 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -79,6 +79,7 @@ import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardRepository; @@ -591,6 +592,15 @@ public CommitStats commitStats() { return engine == null ? null : engine.commitStats(); } + /** + * @return {@link SeqNoStats} if engine is open, otherwise null + */ + @Nullable + public SeqNoStats seqNoStats() { + Engine engine = getEngineOrNull(); + return engine == null ? null : engine.seqNoStats(); + } + public IndexingStats indexingStats(String... types) { return indexingService.stats(types); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 50a16fa1cee70..d99e2ccd0e803 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.index.shard; -import java.io.IOException; - import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.index.IndexSettings; @@ -31,10 +29,13 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogStats; +import java.io.IOException; + /** * ShadowIndexShard extends {@link IndexShard} to add file synchronization * from the primary when a flush happens. It also ensures that a replica being @@ -67,6 +68,11 @@ public MergeStats mergeStats() { return new MergeStats(); } + @Override + public SeqNoStats seqNoStats() { + return null; + } + @Override public boolean canIndex() { return false; diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index dead72aee8b4f..352ca12740516 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -198,7 +198,9 @@ public NodeIndicesStats stats(boolean includePrevious, CommonStatsFlags flags) { if (indexShard.routingEntry() == null) { continue; } - IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), new ShardStats[] { new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats()) }); + IndexShardStats indexShardStats = new IndexShardStats(indexShard.shardId(), + new ShardStats[]{new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), + new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats())}); if (!statsByShard.containsKey(indexService.index())) { statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats)); } else { diff --git a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 1be999d8fedce..9ff9ee9ee437c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -104,8 +104,8 @@ public void testFillShardLevelInfo() { CommonStats commonStats1 = new CommonStats(); commonStats1.store = new StoreStats(1000, 1); ShardStats[] stats = new ShardStats[]{ - new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, "0xdeadbeef", test_0.shardId()), commonStats0, null), - new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, "0xdeadbeef", test_1.shardId()), commonStats1, null) + new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, "0xdeadbeef", test_0.shardId()), commonStats0, null, null), + new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, "0xdeadbeef", test_1.shardId()), commonStats1, null, null) }; ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f38092d131e34..d1bf85e1f2125 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1398,6 +1398,53 @@ public void testIndexWriterInfoStream() { } } + public void testSeqNoAndLocalCheckpoint() { + int opCount = randomIntBetween(1, 10); + long seqNoCount = -1; + for (int op = 0; op < opCount; op++) { + final String id = randomFrom("1", "2", "3"); + ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + if (randomBoolean()) { + final Engine.Index index = new Engine.Index(newUid(id), doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis()); + + try { + engine.index(index); + } catch (VersionConflictEngineException e) { + // OK + } + if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoCount++; + Engine.Index replica = new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), + index.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis()); + replicaEngine.index(replica); + } + } else { + final Engine.Delete delete = new Engine.Delete("test", id, newUid(id), + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.currentTimeMillis(), false); + try { + engine.delete(delete); + } catch (VersionConflictEngineException e) { + // OK + } + if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoCount++; + Engine.Delete replica = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.version(), VersionType.EXTERNAL, REPLICA, System.currentTimeMillis(), false); + replicaEngine.delete(replica); + } + } + } + assertThat(engine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount)); + assertThat(engine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount)); + assertThat(replicaEngine.seqNoStats().getMaxSeqNo(), equalTo(seqNoCount)); + assertThat(replicaEngine.seqNoStats().getLocalCheckpoint(), equalTo(seqNoCount)); + } + // #8603: make sure we can separately log IFD's messages public void testIndexWriterIFDInfoStream() { assumeFalse("who tests the tester?", VERBOSE); diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java new file mode 100644 index 0000000000000..d01938baa9748 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointServiceTests.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CyclicBarrier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isOneOf; + +public class LocalCheckpointServiceTests extends ESTestCase { + + LocalCheckpointService checkpointService; + + final int SMALL_CHUNK_SIZE = 4; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + checkpointService = getCheckpointService(); + } + + protected LocalCheckpointService getCheckpointService() { + return new LocalCheckpointService( + new ShardId("test", 0), + IndexSettingsModule.newIndexSettings("test", + Settings.builder() + .put(LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE, SMALL_CHUNK_SIZE) + .build() + )); + } + + public void testSimplePrimary() { + long seqNo1, seqNo2; + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + seqNo1 = checkpointService.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + checkpointService.markSeqNoAsCompleted(seqNo1); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + seqNo1 = checkpointService.generateSeqNo(); + seqNo2 = checkpointService.generateSeqNo(); + assertThat(seqNo1, equalTo(1L)); + assertThat(seqNo2, equalTo(2L)); + checkpointService.markSeqNoAsCompleted(seqNo2); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + checkpointService.markSeqNoAsCompleted(seqNo1); + assertThat(checkpointService.getCheckpoint(), equalTo(2L)); + } + + public void testSimpleReplica() { + assertThat(checkpointService.getCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + checkpointService.markSeqNoAsCompleted(0L); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + checkpointService.markSeqNoAsCompleted(2L); + assertThat(checkpointService.getCheckpoint(), equalTo(0L)); + checkpointService.markSeqNoAsCompleted(1L); + assertThat(checkpointService.getCheckpoint(), equalTo(2L)); + } + + public void testSimpleOverFlow() { + List seqNoList = new ArrayList<>(); + final boolean aligned = randomBoolean(); + final int maxOps = SMALL_CHUNK_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, SMALL_CHUNK_SIZE - 1)); + + for (int i = 0; i < maxOps; i++) { + seqNoList.add(i); + } + Collections.shuffle(seqNoList, random()); + for (Integer seqNo : seqNoList) { + checkpointService.markSeqNoAsCompleted(seqNo); + } + assertThat(checkpointService.checkpoint, equalTo(maxOps - 1L)); + assertThat(checkpointService.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); + assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + } + + public void testConcurrentPrimary() throws InterruptedException { + Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final int opsPerThread = randomIntBetween(10, 20); + final int maxOps = opsPerThread * threads.length; + final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); + final CyclicBarrier barrier = new CyclicBarrier(threads.length); + for (int t = 0; t < threads.length; t++) { + final int threadId = t; + threads[t] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + throw new ElasticsearchException("failure in background thread", t); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + for (int i = 0; i < opsPerThread; i++) { + long seqNo = checkpointService.generateSeqNo(); + logger.info("[t{}] started [{}]", threadId, seqNo); + if (seqNo != unFinishedSeq) { + checkpointService.markSeqNoAsCompleted(seqNo); + logger.info("[t{}] completed [{}]", threadId, seqNo); + } + } + } + }, "testConcurrentPrimary_" + threadId); + threads[t].start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L)); + assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L)); + checkpointService.markSeqNoAsCompleted(unFinishedSeq); + assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); + assertThat(checkpointService.processedSeqNo.size(), isOneOf(0, 1)); + assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + } + + public void testConcurrentReplica() throws InterruptedException { + Thread[] threads = new Thread[randomIntBetween(2, 5)]; + final int opsPerThread = randomIntBetween(10, 20); + final int maxOps = opsPerThread * threads.length; + final long unFinishedSeq = randomIntBetween(0, maxOps - 2); // make sure we always index the last seqNo to simplify maxSeq checks + Set seqNos = IntStream.range(0, maxOps).boxed().collect(Collectors.toSet()); + + final Integer[][] seqNoPerThread = new Integer[threads.length][]; + for (int t = 0; t < threads.length - 1; t++) { + int size = Math.min(seqNos.size(), randomIntBetween(opsPerThread - 4, opsPerThread + 4)); + seqNoPerThread[t] = randomSubsetOf(size, seqNos).toArray(new Integer[size]); + seqNos.removeAll(Arrays.asList(seqNoPerThread[t])); + } + seqNoPerThread[threads.length - 1] = seqNos.toArray(new Integer[seqNos.size()]); + logger.info("--> will run [{}] threads, maxOps [{}], unfinished seq no [{}]", threads.length, maxOps, unFinishedSeq); + final CyclicBarrier barrier = new CyclicBarrier(threads.length); + for (int t = 0; t < threads.length; t++) { + final int threadId = t; + threads[t] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + throw new ElasticsearchException("failure in background thread", t); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + Integer[] ops = seqNoPerThread[threadId]; + for (int seqNo : ops) { + if (seqNo != unFinishedSeq) { + checkpointService.markSeqNoAsCompleted(seqNo); + logger.info("[t{}] completed [{}]", threadId, seqNo); + } + } + } + }, "testConcurrentReplica_" + threadId); + threads[t].start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(checkpointService.getMaxSeqNo(), equalTo(maxOps - 1L)); + assertThat(checkpointService.getCheckpoint(), equalTo(unFinishedSeq - 1L)); + checkpointService.markSeqNoAsCompleted(unFinishedSeq); + assertThat(checkpointService.getCheckpoint(), equalTo(maxOps - 1L)); + assertThat(checkpointService.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1923c1e2cffa4..0fd9939e613a3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -591,7 +591,8 @@ public void testShardStats() throws IOException { IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService("test"); IndexShard shard = test.getShardOrNull(0); - ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), shard.commitStats()); + ShardStats stats = new ShardStats(shard.routingEntry(), shard.shardPath(), new CommonStats(shard, new CommonStatsFlags()), + shard.commitStats(), shard.seqNoStats()); assertEquals(shard.shardPath().getRootDataPath().toString(), stats.getDataPath()); assertEquals(shard.shardPath().getRootStatePath().toString(), stats.getStatePath()); assertEquals(shard.shardPath().isCustomDataPath(), stats.isCustomDataPath()); diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java index c59c3ba4d4e6f..01bc1813ab74c 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -29,14 +29,12 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter; - import org.apache.lucene.uninverting.UninvertingReader; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.TestRuleMarkFailure; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TimeUnits; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.bootstrap.BootstrapForTesting; import org.elasticsearch.cache.recycler.MockPageCacheRecycler; @@ -50,7 +48,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -69,14 +66,18 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; -import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; import static org.hamcrest.Matchers.equalTo; /** @@ -562,14 +563,22 @@ private static String groupName(ThreadGroup threadGroup) { * Returns size random values */ public static List randomSubsetOf(int size, T... values) { - if (size > values.length) { - throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.length + " objects"); + return randomSubsetOf(size, Arrays.asList(values)); + } + + /** + * Returns size random values + */ + public static List randomSubsetOf(int size, Collection values) { + if (size > values.size()) { + throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.size() + " objects"); } - List list = arrayAsArrayList(values); - Collections.shuffle(list); + List list = new ArrayList<>(values); + Collections.shuffle(list, random()); return list.subList(0, size); } + /** * Returns true iff assertions for elasticsearch packages are enabled */ @@ -615,7 +624,7 @@ public void assertPathHasBeenCleared(Path path) throws Exception { sb.append("]"); assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); } - + /** Returns the suite failure marker: internal use only! */ public static TestRuleMarkFailure getSuiteFailureMarker() { return suiteFailureMarker;