Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.net.InetAddress;

public abstract class FieldStats<T> implements Writeable, ToXContent {

private final byte type;
private long maxDoc;
private long docCount;
Expand Down Expand Up @@ -628,4 +629,5 @@ private final static class Fields {
final static String MAX_VALUE = new String("max_value");
final static String MAX_VALUE_AS_STRING = new String("max_value_as_string");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
Expand All @@ -54,8 +55,10 @@
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand All @@ -78,10 +81,13 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED;

/**
*
*/
public class InternalEngine extends Engine {

/**
* When we last pruned expired tombstones from versionMap.deletes:
*/
Expand Down Expand Up @@ -111,6 +117,8 @@ public class InternalEngine extends Engine {
private final IndexThrottle throttle;

private final SequenceNumbersService seqNoService;
final static String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
final static String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand All @@ -131,12 +139,27 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
seqNoService = new SequenceNumbersService(shardId, engineConfig.getIndexSettings());

mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try {
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer);
if (logger.isTraceEnabled()) {
logger.trace(
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
}
seqNoService =
new SequenceNumbersService(
shardId,
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
indexWriter = writer;
translog = openTranslog(engineConfig, writer);
assert translog.getGeneration() != null;
Expand Down Expand Up @@ -287,6 +310,36 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
return null;
}

private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException {
final long maxSeqNo;
try (IndexReader reader = DirectoryReader.open(writer)) {
final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader);
if (stats != null) {
maxSeqNo = (long) stats.getMaxValue();
} else {
maxSeqNo = NO_OPS_PERFORMED;
}
}

final Map<String, String> commitUserData = writer.getCommitData();

final long localCheckpoint;
if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) {
localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY));
} else {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
}

final long globalCheckpoint;
if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY));
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down Expand Up @@ -1132,13 +1185,22 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
logger.trace("committing writer with translog id [{}] and sync id [{}] ", translogGeneration.translogFileGeneration, syncId);
Map<String, String> commitData = new HashMap<>(2);
final Map<String, String> commitData = new HashMap<>(5);

commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);

commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint()));
commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint()));

if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}

indexWriter.setCommitData(commitData);
writer.commit();
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Bits;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -108,6 +114,36 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) {
throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable");
}

@Override
public FieldStats stats(IndexReader reader) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a nocommit on this implementation, so it will be removed?
PS. Although we do this temporarily to achieve other goals, we will end up adding a different stats implementation once seq no is properly indexed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 689618b.

// nocommit remove implementation when late-binding commits
// are possible
final List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}

long currentMin = Long.MAX_VALUE;
long currentMax = Long.MIN_VALUE;
boolean found = false;
for (int i = 0; i < leaves.size(); i++) {
final LeafReader leaf = leaves.get(i).reader();
final NumericDocValues values = leaf.getNumericDocValues(name());
if (values == null) continue;
final Bits bits = leaf.getLiveDocs();
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
if (bits == null || bits.get(docID)) {
found = true;
final long value = values.get(docID);
currentMin = Math.min(currentMin, value);
currentMax = Math.max(currentMax, value);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant think of any place where we would want a full top level scan like this. Can we return -1 or null or throw uoe? This acts O(n^2) in a near real time system. Do we know of any other loops like this in this stats code?

Copy link
Member Author

@jasontedor jasontedor Jun 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation will not be the permanent implementation, it is temporary as we iterate in this feature branch. This method is only used when starting an engine, and is needed for recovery (either locally or from a peer).

}

return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null;
}

}

public SeqNoFieldMapper(Settings indexSettings) {
Expand All @@ -129,7 +165,7 @@ protected void parseCreateField(ParseContext context, List<Field> fields) throws

@Override
public Mapper parse(ParseContext context) throws IOException {
// _seqno added in preparse
// _seq_no added in pre-parse
return null;
}

Expand Down Expand Up @@ -157,4 +193,5 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
protected void doMerge(Mapper mergeWith, boolean updateAllTypes) {
// nothing to do
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,28 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
*/
final private ObjectLongMap<String> trackingLocalCheckpoint;

private long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
private long globalCheckpoint;

public GlobalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
/**
* Initialize the global checkpoint service. The {@code globalCheckpoint}
* should be set to the last known global checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}.
*
* @param shardId the shard this service is providing tracking
* local checkpoints for
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard,
* or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
super(shardId, indexSettings);
activeLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
inSyncLocalCheckpoints = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
trackingLocalCheckpoint = new ObjectLongHashMap<>(indexSettings.getNumberOfReplicas());
this.globalCheckpoint = globalCheckpoint;
}


/**
* notifies the service of a local checkpoint. if the checkpoint is lower than the currently known one,
* this is a noop. Last, if the allocation id is not yet known, it is ignored. This to prevent late
Expand Down Expand Up @@ -124,7 +136,7 @@ private boolean updateLocalCheckpointInMap(String allocationId, long localCheckp
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
synchronized public boolean updateCheckpointOnPrimary() {
synchronized boolean updateCheckpointOnPrimary() {
long minCheckpoint = Long.MAX_VALUE;
if (activeLocalCheckpoints.isEmpty() && inSyncLocalCheckpoints.isEmpty()) {
return false;
Expand Down Expand Up @@ -164,7 +176,7 @@ synchronized public long getCheckpoint() {
/**
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/
synchronized public void updateCheckpointOnReplica(long globalCheckpoint) {
synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
if (this.globalCheckpoint <= globalCheckpoint) {
this.globalCheckpoint = globalCheckpoint;
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);
Expand Down Expand Up @@ -241,4 +253,5 @@ synchronized long getLocalCheckpointForAllocation(String allocationId) {
}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.SnapshotStatus;

import java.util.LinkedList;

Expand All @@ -39,39 +40,64 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024,
4, Setting.Property.IndexScope);


/**
* an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo}
* which marks the seqNo the fist bit in the first array corresponds to.
*/
final LinkedList<FixedBitSet> processedSeqNo;
final int bitArraysSize;
long firstProcessedSeqNo = 0;
private final int bitArraysSize;
long firstProcessedSeqNo;

/** the current local checkpoint, i.e., all seqNo lower (&lt;=) than this number have been completed */
volatile long checkpoint = SequenceNumbersService.NO_OPS_PERFORMED;
volatile long checkpoint;

/** the next available seqNo - used for seqNo generation */
volatile long nextSeqNo = 0;

private volatile long nextSeqNo;

public LocalCheckpointService(ShardId shardId, IndexSettings indexSettings) {
/**
* Initialize the local checkpoint service. The {@code maxSeqNo} should be
* set to the last sequence number assigned by this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and
* {@code localCheckpoint} should be set to the last known local checkpoint
* for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
*
* @param shardId the shard this service is providing tracking
* local checkpoints for
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
*/
LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
super(shardId, indexSettings);
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] "
+ "but was [" + localCheckpoint + "]");
}
if (maxSeqNo < 0 && maxSeqNo != SequenceNumbersService.NO_OPS_PERFORMED) {
throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
}
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
processedSeqNo = new LinkedList<>();
firstProcessedSeqNo = localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
this.nextSeqNo = maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
this.checkpoint = localCheckpoint;
}

/**
* issue the next sequence number
**/
public synchronized long generateSeqNo() {
synchronized long generateSeqNo() {
return nextSeqNo++;
}

/**
* marks the processing of the given seqNo have been completed
**/
public synchronized void markSeqNoAsCompleted(long seqNo) {
synchronized void markSeqNoAsCompleted(long seqNo) {
// make sure we track highest seen seqNo
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
Expand All @@ -94,7 +120,7 @@ public long getCheckpoint() {
}

/** gets the maximum seqno seen so far */
public long getMaxSeqNo() {
long getMaxSeqNo() {
return nextSeqNo - 1;
}

Expand Down Expand Up @@ -130,19 +156,19 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
*/
private FixedBitSet getBitSetForSeqNo(long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the traces of a happy debugging session :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guilty as charged. 😄

int bitSetOffset = ((int) (seqNo - firstProcessedSeqNo)) / bitArraysSize;
while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(bitArraysSize));
}
return processedSeqNo.get(bitSetOffset);
}


/** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */
private int seqNoToBitSetOffset(long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;
}

}
Loading