Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cabd994
Introduce translog generation folding
jasontedor Mar 14, 2017
152cae7
Fix ordering of checking/locks
jasontedor Mar 16, 2017
91d5f85
Remove unnecessary import
jasontedor Mar 16, 2017
c88e788
Change method to be instance method
jasontedor Mar 16, 2017
7e69ae8
Remove generation parameter from fold
jasontedor Mar 16, 2017
fb00dd3
Remove obsolete catch block
jasontedor Mar 16, 2017
0d5e6e2
Move field
jasontedor Mar 16, 2017
bbab126
Rename folding generation to rolling generation
jasontedor Mar 17, 2017
421b336
Merge branch 'master' into translog-generation
jasontedor Mar 17, 2017
8f6b609
Stricter roll checking
jasontedor Mar 17, 2017
70ade35
Merge branch 'master' into translog-generation
jasontedor Mar 17, 2017
d9fbc42
Add test for rolling and committing
jasontedor Mar 17, 2017
9076d79
Simplify tests
jasontedor Mar 17, 2017
b79053e
Fix typo
jasontedor Mar 18, 2017
b4ed67d
Merge branch 'master' into translog-generation
jasontedor Mar 20, 2017
1669224
Stronger test for precommit/roll/commit sequences
jasontedor Mar 20, 2017
35d0119
Add Javadocs to IndexSettings#getGenerationThresholdSize
jasontedor Mar 20, 2017
32afd9e
Add assert message
jasontedor Mar 20, 2017
518f12e
Merge branch 'master' into translog-generation
jasontedor Mar 22, 2017
7480faf
Merge branch 'master' into translog-generation
jasontedor Mar 22, 2017
dd14ba8
Migrate translog generation rolling to index shard
jasontedor Mar 22, 2017
f59233c
Add Javadocs and tidy up
jasontedor Mar 22, 2017
3c78802
Merge branch 'master' into translog-generation
jasontedor Mar 22, 2017
192a7ce
Funny business
jasontedor Mar 23, 2017
6c2adb6
Revert "Funny business"
jasontedor Mar 23, 2017
1a31061
Committed generation
jasontedor Mar 23, 2017
07fc092
Remove unneeded field
jasontedor Mar 23, 2017
2e7f722
Fix thread pool name
jasontedor Mar 27, 2017
a14937d
Change method name
jasontedor Mar 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand All @@ -46,7 +42,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.core.pattern.ConverterKeys;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -302,15 +297,21 @@ private void maybeFinish() {
}

void run() {
// we either respond immediately ie. if we we don't fsync per request or wait for refresh
// OR we got an pass async operations on and wait for them to return to respond.
indexShard.maybeFlush();
maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success.
/*
* We either respond immediately (i.e., if we do not fsync per request or wait for
* refresh), or we there are past async operations and we wait for them to return to
* respond.
*/
indexShard.afterWriteOperation();
// decrement pending by one, if there is nothing else to do we just respond with success
maybeFinish();
if (waitUntilRefresh) {
assert pendingOps.get() > 0;
indexShard.addRefreshListener(location, forcedRefresh -> {
if (forcedRefresh) {
logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request);
logger.warn(
"block until refresh ran out of slots and forced a refresh: [{}]",
request);
}
refreshed.set(forcedRefresh);
maybeFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
33 changes: 32 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -112,6 +111,16 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
*/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting(
"index.translog.generation_threshold_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wondering if this should just be index.translog.generation_size

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked index.translog.generation_threshold_size to be consistent with index.translog.flush_threshold_size. Do you still wonder if it should be changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep as is then. I don't mind much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you maybe document what happens if that size is exceeded or add a link to the explain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna understand why this is 64MB why can't we use INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING or it's default instead rather than an arbitrary value? I would love to get some insight that we also can document? Then there is also the question why we can't simply flush this from the outside via IndexShard#maybeFlush, that would simplify the translog potentially, one property that I liked about the Translog#add() operation was that it would never acquire a write lock. We are now entering potentially dangerous territory here, locking can be a beast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna understand why this is 64MB

Jason can say why he chose 64MB - I think any value we choose now will be arbitrary so I was good with it for now and tweak later if needed.

why can't we use INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING or it's default instead rather than an arbitrary value?

We want a number that's much smaller than the flush size. We're heading towards keeping the generations that are needed to recover all seq# ops after a certain point. That means that we won't always clean previous generation when flushing. If we use the flush size for generations we can end up in a poisonous where we repeatedly try to flush but the translog is not trimmed.

why we can't simply flush this from the outside via IndexShard#maybeFlush

Do you mean doing both flushing and potentially opening a new generation from the same method that is called after indexing? i.e., call it maybeFlushAndRoll ( :) ) ? I would be good with that (though prefer the current approach).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean doing both flushing and potentially opening a new generation from the same method that is called after indexing? i.e., call it maybeFlushAndRoll ( :) ) ? I would be good with that (though prefer the current approach).

I was looking into having a Translog#rollover() method we can from the maybeFlush we can rename maybeFlush to onAfterOpteration to not necessarily yield impl details. I am a bit concerned about the write lock, which might not be a problem today but maybe tomorrow.

We want a number that's much smaller than the flush size. We're heading towards keeping the generations that are needed to recover all seq# ops after a certain point.

makes sense. lets document this. it's not clear from reviewing the PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed dd14ba8 to move the control of rolling to index shard. Would @nik9000, @bleskes, and @s1monw please take another look?

new ByteSizeValue(64, ByteSizeUnit.MB),
new Property[]{Property.Dynamic, Property.IndexScope});

public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL =
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
Expand Down Expand Up @@ -156,6 +165,7 @@ public final class IndexSettings {
private volatile TimeValue refreshInterval;
private final TimeValue globalCheckpointInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexScopedSettings scopedSettings;
Expand Down Expand Up @@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
Expand Down Expand Up @@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
Expand All @@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}

private void setGCDeletes(TimeValue timeValue) {
this.gcDeletesInMillis = timeValue.getMillis();
}
Expand Down Expand Up @@ -461,6 +479,19 @@ public TimeValue getGlobalCheckpointInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
* growing too large to avoid excessive disk space consumption. Therefore, the translog is
* automatically rolled to a new generation when the current generation exceeds this generation
* threshold size.
*
* @return the generation threshold size
*/
public ByteSizeValue getGenerationThresholdSize() {
return generationThresholdSize;
}

/**
* Returns the {@link MergeSchedulerConfig}
*/
Expand Down
152 changes: 108 additions & 44 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -771,27 +771,44 @@ public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expecte
return engine.syncFlush(syncId, expectedCommitId);
}

public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException {
boolean waitIfOngoing = request.waitIfOngoing();
boolean force = request.force();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", request);
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since
// we use #writeIndexingBuffer for this now.
/**
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return the commit ID
*/
public Engine.CommitId flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
/*
* We allow flushes while recovery since we allow operations to happen while recovering and
* we want to keep the translog under control (up to deletes, which we do not GC). Yet, we
* do not use flush internally to clear deletes and flush the index writer since we use
* Engine#writeIndexingBuffer for this now.
*/
verifyNotClosed();
Engine engine = getEngine();
final Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" +
" from translog");
throw new IllegalIndexShardStateException(
shardId(),
state,
"flush is only allowed if the engine is not recovery from translog");
}
long time = System.nanoTime();
Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
final long time = System.nanoTime();
final Engine.CommitId commitId = engine.flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
}

/**
* Rolls the tranlog generation.
*
* @throws IOException if any file operations on the translog throw an I/O exception
*/
private void rollTranslogGeneration() throws IOException {
final Engine engine = getEngine();
engine.getTranslog().rollGeneration();
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
Expand Down Expand Up @@ -1256,17 +1273,39 @@ public boolean restoreFromRepository(Repository repository) {
}

/**
* Returns <code>true</code> iff this shard needs to be flushed due to too many translog operation or a too large transaction log.
* Otherwise <code>false</code>.
* Tests whether or not the translog should be flushed. This test is based on the current size
* of the translog comparted to the configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
*/
boolean shouldFlush() {
Engine engine = getEngineOrNull();
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
final Translog translog = engine.getTranslog();
return translog.shouldFlush();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
}

/**
* Tests whether or not the translog generation should be rolled to a new generation. This test
* is based on the size of the current generation compared to the configured generation
* threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
boolean shouldRollTranslogGeneration() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldRollGeneration();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
}
return false;
Expand Down Expand Up @@ -1810,28 +1849,31 @@ public Translog.Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

private final AtomicBoolean asyncFlushRunning = new AtomicBoolean();
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
*
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
* Schedules a flush or translog generation roll if needed but will not schedule more than one
* concurrently. The operation will be executed asynchronously on the flush thread pool.
*/
public boolean maybeFlush() {
if (shouldFlush()) {
if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread
if (shouldFlush() == false) {
// we have to check again since otherwise there is a race when a thread passes
// the first shouldFlush() check next to another thread which flushes fast enough
// to finish before the current thread could flip the asyncFlushRunning flag.
// in that situation we have an extra unexpected flush.
asyncFlushRunning.compareAndSet(true, false);
} else {
public void afterWriteOperation() {
if (shouldFlush() || shouldRollTranslogGeneration()) {
if (flushOrRollRunning.compareAndSet(false, true)) {
/*
* We have to check again since otherwise there is a race when a thread passes the
* first check next to another thread which performs the operation quickly enough to
* finish before the current thread could flip the flag. In that situation, we have
* an extra operation.
*
* Additionally, a flush implicitly executes a translog generation roll so if we
* execute a flush then we do not need to check if we should roll the translog
* generation.
*/
if (shouldFlush()) {
logger.debug("submitting async flush request");
final AbstractRunnable abstractRunnable = new AbstractRunnable() {
final AbstractRunnable flush = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush index", e);
}
Expand All @@ -1844,16 +1886,38 @@ protected void doRun() throws Exception {

@Override
public void onAfter() {
asyncFlushRunning.compareAndSet(true, false);
maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable);
return true;
threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
} else if (shouldRollTranslogGeneration()) {
logger.debug("submitting async roll translog generation request");
final AbstractRunnable roll = new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to roll translog generation", e);
}
}

@Override
protected void doRun() throws Exception {
rollTranslogGeneration();
}

@Override
public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
afterWriteOperation();
}
};
threadPool.executor(ThreadPool.Names.FLUSH).execute(roll);
} else {
flushOrRollRunning.compareAndSet(true, false);
}
}
}
return false;
}

/**
Expand Down
Loading