From cabd99445b3d5d43ad04c454dcc4476282d6628b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 14 Mar 2017 10:55:55 -0700 Subject: [PATCH 01/23] Introduce translog generation folding This commit introduces a maximum size for a translog generation and automatically folds the translog when a generation exceeds the threshold into a new generation. This threshold is configurable per index and defaults to sixty-four megabytes. We introduce this constraint as sequence numbers will require keeping around more than the current generation (to ensure that we can rollback to the global checkpoint). Without keeping the size of generations under control, having to keep old generations around could consume excessive disk space. A follow-up will enable commits to trim previous generations based on the global checkpoint. --- .../common/settings/IndexScopedSettings.java | 1 + .../elasticsearch/index/IndexSettings.java | 24 ++++- .../index/translog/Translog.java | 86 ++++++++++++++---- .../index/IndexSettingsTests.java | 21 +++++ .../index/translog/TranslogTests.java | 91 +++++++++++++++++-- .../indices/settings/UpdateSettingsIT.java | 8 +- 6 files changed, 202 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index badd80d5aea76..a072b68b2770d 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 47c7ffb71bc85..8c616ebe2ba63 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -22,7 +22,6 @@ import org.apache.lucene.index.MergePolicy; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -112,6 +111,16 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * The maximum size of a translog generation. This is independent of the maximum size of + * translog operations that have not been flushed. + */ + public static final Setting INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING = + Setting.byteSizeSetting( + "index.translog.generation_threshold_size", + new ByteSizeValue(64, ByteSizeUnit.MB), + new Property[]{Property.Dynamic, Property.IndexScope}); + public static final Setting INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL = Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); @@ -156,6 +165,7 @@ public final class IndexSettings { private volatile TimeValue refreshInterval; private final TimeValue globalCheckpointInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; private final IndexScopedSettings scopedSettings; @@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); @@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize); + scopedSettings.addSettingsUpdateConsumer( + INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { + this.generationThresholdSize = generationThresholdSize; + } + private void setGCDeletes(TimeValue timeValue) { this.gcDeletesInMillis = timeValue.getMillis(); } @@ -461,6 +479,10 @@ public TimeValue getGlobalCheckpointInterval() { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + public ByteSizeValue getGenerationThresholdSize() { + return generationThresholdSize; + } + /** * Returns the {@link MergeSchedulerConfig} */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index ee4d0a4391a23..bff0685afefa0 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -329,7 +329,7 @@ public Path location() { * Returns the generation of the current transaction log. */ public long currentFileGeneration() { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { return current.getGeneration(); } } @@ -399,6 +399,8 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { return newFile; } + final AtomicBoolean foldingGeneration = new AtomicBoolean(); + /** * Adds an operation to the transaction log. * @@ -409,20 +411,31 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { - final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); final long start = out.position(); out.skip(Integer.BYTES); - writeOperationNoSize(checksumStreamOutput, operation); + writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation); final long end = out.position(); final int operationSize = (int) (end - Integer.BYTES - start); out.seek(start); out.writeInt(operationSize); out.seek(end); final ReleasablePagedBytesReference bytes = out.bytes(); + final Location location; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - return current.add(bytes, operation.seqNo()); + location = current.add(bytes, operation.seqNo()); } + try (ReleasableLock ignored = writeLock.acquire()) { + if (shouldFoldGeneration(this) && foldingGeneration.compareAndSet(false, true)) { + // we have to check the condition again lest we could fold twice in a race + if (shouldFoldGeneration(this)) { + this.foldGeneration(current.getGeneration()); + } + final boolean wasFoldingGeneration = foldingGeneration.getAndSet(false); + assert wasFoldingGeneration; + } + } + return location; } catch (final AlreadyClosedException | IOException ex) { try { closeOnTragicEvent(ex); @@ -442,6 +455,20 @@ public Location add(final Operation operation) throws IOException { } } + /** + * Tests whether or not the current generation of the translog should be folded into a new + * generation. This test is based on the size of the current generation compared to the + * configured generation threshold size. + * + * @param translog the translog + * @return {@code true} if the current generation should be folded into a new generation + */ + private static boolean shouldFoldGeneration(final Translog translog) { + final long size = translog.current.sizeInBytes(); + final long threshold = translog.indexSettings.getGenerationThresholdSize().getBytes(); + return size > threshold; + } + /** * The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which * can be returned by the next write. @@ -1322,27 +1349,46 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl out.writeInt((int) checksum); } + /** + * Fold the current translog generation into a new generation. This does not commit the + * translog. The translog write lock must be held by the current thread. + * + * @param generation the current translog generation + * @throws IOException if an I/O exception occurred during any file operations + */ + void foldGeneration(final long generation) throws IOException { + assert writeLock.isHeldByCurrentThread(); + try { + final TranslogReader reader = current.closeIntoReader(); + readers.add(reader); + final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); + assert Checkpoint.read(checkpoint).generation == generation; + final Path generationCheckpoint = + location.resolve(getCommitCheckpointFileName(generation)); + Files.copy(checkpoint, generationCheckpoint); + IOUtils.fsync(generationCheckpoint, false); + IOUtils.fsync(generationCheckpoint.getParent(), true); + // create a new translog file; this will sync it and update the checkpoint data; + current = createWriter(generation + 1); + logger.trace("current translog set to [{}]", current.getGeneration()); + } catch (final Exception e) { + IOUtils.closeWhileHandlingException(this); // tragic event + throw e; + } + } + @Override public long prepareCommit() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration != NOT_SET_GENERATION) { - throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration); + throw new IllegalStateException("already committing a translog with generation: " + + currentCommittingGeneration); } - currentCommittingGeneration = current.getGeneration(); - TranslogReader currentCommittingTranslog = current.closeIntoReader(); - readers.add(currentCommittingTranslog); - Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); - assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration(); - Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration())); - Files.copy(checkpoint, commitCheckpoint); - IOUtils.fsync(commitCheckpoint, false); - IOUtils.fsync(commitCheckpoint.getParent(), true); - // create a new translog file - this will sync it and update the checkpoint data; - current = createWriter(current.getGeneration() + 1); - logger.trace("current translog set to [{}]", current.getGeneration()); - - } catch (Exception e) { + final long generation = current.getGeneration(); + currentCommittingGeneration = generation; + foldGeneration(generation); + } catch (final Exception e) { IOUtils.closeWhileHandlingException(this); // tragic event throw e; } diff --git a/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java b/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java index a32d076272ba6..bc3ee4b5f06f1 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java @@ -370,6 +370,27 @@ public void testTranslogFlushSizeThreshold() { assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize()); } + public void testTranslogGenerationSizeThreshold() { + final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt())); + final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(); + final ByteSizeValue actualValue = + ByteSizeValue.parseBytesSizeValue(size.toString(), key); + final IndexMetaData metaData = + newIndexMeta( + "index", + Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(key, size.toString()) + .build()); + final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY); + assertEquals(actualValue, settings.getGenerationThresholdSize()); + final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt())); + final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key); + settings.updateIndexMetaData( + newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build())); + assertEquals(actual, settings.getGenerationThresholdSize()); + } + public void testArchiveBrokenIndexSettings() { Settings settings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings( diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e47a5652b2431..ffb6c8803b163 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.translog; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; @@ -44,13 +45,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.Operation.Origin; @@ -100,6 +102,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -156,12 +159,25 @@ private Translog create(Path path) throws IOException { return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get()); } - private TranslogConfig getTranslogConfig(Path path) { - Settings build = Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); - ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); - return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize); + private TranslogConfig getTranslogConfig(final Path path) { + final Settings settings = Settings + .builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build(); + return getTranslogConfig(path, settings); + } + + private TranslogConfig getTranslogConfig(final Path path, final Settings settings) { + final ByteSizeValue bufferSize; + if (randomBoolean()) { + bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE; + } else { + bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); + } + + final IndexSettings indexSettings = + IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { @@ -2073,4 +2089,65 @@ public void testTranslogOpSerialization() throws Exception { Translog.Delete serializedDelete = new Translog.Delete(in); assertEquals(delete, serializedDelete); } + + public void testFoldGeneration() throws IOException { + final long generation = translog.currentFileGeneration(); + final int folds = randomIntBetween(1, 16); + int totalOperations = 0; + int seqNo = 0; + for (int i = 0; i < folds; i++) { + final int operations = randomIntBetween(1, 128); + for (int j = 0; j < operations; j++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + totalOperations++; + } + try (ReleasableLock ignored = translog.writeLock.acquire()) { + translog.foldGeneration(generation + i); + } + assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1)); + assertThat(translog.totalOperations(), equalTo(totalOperations)); + } + for (int i = 0; i <= folds; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.commit(); + assertThat(translog.currentFileGeneration(), equalTo(generation + folds + 1)); + assertThat(translog.totalOperations(), equalTo(0)); + for (int i = 0; i <= folds; i++) { + assertFileDeleted(translog, generation + i); + } + assertFileIsPresent(translog, generation + folds + 1); + } + + public void testGenerationThreshold() throws IOException { + translog.close(); + final int generationThreshold = randomIntBetween(1, 512); + final Settings settings = Settings + .builder() + .put("index.translog.generation_threshold_size", generationThreshold + "b") + .build(); + long seqNo = 0; + long folds = 0; + final TranslogConfig config = getTranslogConfig(translogDir, settings); + try (Translog translog = + new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + final long generation = translog.currentFileGeneration(); + for (int i = 0; i < randomIntBetween(32, 128); i++) { + assertThat(translog.currentFileGeneration(), equalTo(generation + folds)); + final Location location = translog.add(new Translog.NoOp(seqNo++, 0, "test")); + if (location.translogLocation + location.size > generationThreshold) { + folds++; + assertThat(translog.currentFileGeneration(), equalTo(generation + folds)); + for (int j = 0; j < folds; j++) { + assertFileIsPresent(translog, generation + j); + } + } + } + + for (int j = 0; j < folds; j++) { + assertFileIsPresent(translog, generation + j); + } + } + } + } diff --git a/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index ae6b4588271b4..762d409b6b75e 100644 --- a/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -93,7 +93,11 @@ public void testResetDefault() { .admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put("index.refresh_interval", -1).put("index.translog.flush_threshold_size", "1024b")) + .setSettings( + Settings.builder() + .put("index.refresh_interval", -1) + .put("index.translog.flush_threshold_size", "1024b") + .put("index.translog.generation_threshold_size", "4096b")) .execute() .actionGet(); IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); @@ -103,6 +107,7 @@ public void testResetDefault() { if (indexService != null) { assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), -1); assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024); + assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096); } } client() @@ -119,6 +124,7 @@ public void testResetDefault() { if (indexService != null) { assertEquals(indexService.getIndexSettings().getRefreshInterval().millis(), 1000); assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024); + assertEquals(indexService.getIndexSettings().getGenerationThresholdSize().getBytes(), 4096); } } } From 152cae7e500a4df56f6dea9f3d835313ce972d67 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Mar 2017 19:29:04 -0400 Subject: [PATCH 02/23] Fix ordering of checking/locks --- .../java/org/elasticsearch/index/translog/Translog.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index bff0685afefa0..6b2cd480dc845 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -425,10 +425,10 @@ public Location add(final Operation operation) throws IOException { ensureOpen(); location = current.add(bytes, operation.seqNo()); } - try (ReleasableLock ignored = writeLock.acquire()) { - if (shouldFoldGeneration(this) && foldingGeneration.compareAndSet(false, true)) { - // we have to check the condition again lest we could fold twice in a race - if (shouldFoldGeneration(this)) { + if (shouldFoldGeneration(this) && foldingGeneration.compareAndSet(false, true)) { + // we have to check the condition again lest we could fold twice in a race + if (shouldFoldGeneration(this)) { + try (ReleasableLock ignored = writeLock.acquire()) { this.foldGeneration(current.getGeneration()); } final boolean wasFoldingGeneration = foldingGeneration.getAndSet(false); From 91d5f85943e425375fde6385a5720973a5bcdf12 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Mar 2017 19:33:22 -0400 Subject: [PATCH 03/23] Remove unnecessary import --- .../java/org/elasticsearch/index/translog/TranslogTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ffb6c8803b163..62ab2c2f45ab7 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.translog; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; From c88e788442aebad6c45875eb0c33b51542500549 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Mar 2017 19:38:24 -0400 Subject: [PATCH 04/23] Change method to be instance method --- .../org/elasticsearch/index/translog/Translog.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 6b2cd480dc845..d21a095014885 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -425,9 +425,9 @@ public Location add(final Operation operation) throws IOException { ensureOpen(); location = current.add(bytes, operation.seqNo()); } - if (shouldFoldGeneration(this) && foldingGeneration.compareAndSet(false, true)) { + if (shouldFoldGeneration() && foldingGeneration.compareAndSet(false, true)) { // we have to check the condition again lest we could fold twice in a race - if (shouldFoldGeneration(this)) { + if (shouldFoldGeneration()) { try (ReleasableLock ignored = writeLock.acquire()) { this.foldGeneration(current.getGeneration()); } @@ -460,12 +460,11 @@ public Location add(final Operation operation) throws IOException { * generation. This test is based on the size of the current generation compared to the * configured generation threshold size. * - * @param translog the translog * @return {@code true} if the current generation should be folded into a new generation */ - private static boolean shouldFoldGeneration(final Translog translog) { - final long size = translog.current.sizeInBytes(); - final long threshold = translog.indexSettings.getGenerationThresholdSize().getBytes(); + private boolean shouldFoldGeneration() { + final long size = this.current.sizeInBytes(); + final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes(); return size > threshold; } From 7e69ae817ccb176a068754a2ec1a6527c7b17f69 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Mar 2017 19:41:10 -0400 Subject: [PATCH 05/23] Remove generation parameter from fold --- .../elasticsearch/index/translog/Translog.java | 16 +++++++--------- .../index/translog/TranslogTests.java | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d21a095014885..b1b0d8f85c094 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -429,7 +429,7 @@ public Location add(final Operation operation) throws IOException { // we have to check the condition again lest we could fold twice in a race if (shouldFoldGeneration()) { try (ReleasableLock ignored = writeLock.acquire()) { - this.foldGeneration(current.getGeneration()); + this.foldGeneration(); } final boolean wasFoldingGeneration = foldingGeneration.getAndSet(false); assert wasFoldingGeneration; @@ -1352,23 +1352,22 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl * Fold the current translog generation into a new generation. This does not commit the * translog. The translog write lock must be held by the current thread. * - * @param generation the current translog generation * @throws IOException if an I/O exception occurred during any file operations */ - void foldGeneration(final long generation) throws IOException { + void foldGeneration() throws IOException { assert writeLock.isHeldByCurrentThread(); try { final TranslogReader reader = current.closeIntoReader(); readers.add(reader); final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); - assert Checkpoint.read(checkpoint).generation == generation; + assert Checkpoint.read(checkpoint).generation == current.getGeneration(); final Path generationCheckpoint = - location.resolve(getCommitCheckpointFileName(generation)); + location.resolve(getCommitCheckpointFileName(current.getGeneration())); Files.copy(checkpoint, generationCheckpoint); IOUtils.fsync(generationCheckpoint, false); IOUtils.fsync(generationCheckpoint.getParent(), true); // create a new translog file; this will sync it and update the checkpoint data; - current = createWriter(generation + 1); + current = createWriter(current.getGeneration() + 1); logger.trace("current translog set to [{}]", current.getGeneration()); } catch (final Exception e) { IOUtils.closeWhileHandlingException(this); // tragic event @@ -1384,9 +1383,8 @@ public long prepareCommit() throws IOException { throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration); } - final long generation = current.getGeneration(); - currentCommittingGeneration = generation; - foldGeneration(generation); + currentCommittingGeneration = current.getGeneration(); + foldGeneration(); } catch (final Exception e) { IOUtils.closeWhileHandlingException(this); // tragic event throw e; diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 62ab2c2f45ab7..f56a9879c4d3b 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2101,7 +2101,7 @@ public void testFoldGeneration() throws IOException { totalOperations++; } try (ReleasableLock ignored = translog.writeLock.acquire()) { - translog.foldGeneration(generation + i); + translog.foldGeneration(); } assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1)); assertThat(translog.totalOperations(), equalTo(totalOperations)); From fb00dd346b57e79a27917dd541bc270a9f89cb88 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Mar 2017 19:42:13 -0400 Subject: [PATCH 06/23] Remove obsolete catch block --- .../main/java/org/elasticsearch/index/translog/Translog.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index b1b0d8f85c094..1462e5e5ed544 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1385,9 +1385,6 @@ public long prepareCommit() throws IOException { } currentCommittingGeneration = current.getGeneration(); foldGeneration(); - } catch (final Exception e) { - IOUtils.closeWhileHandlingException(this); // tragic event - throw e; } return 0L; } From 0d5e6e2d3959b9aeade4f9221dd64bbb0f14eb70 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 16 Mar 2017 19:43:03 -0400 Subject: [PATCH 07/23] Move field --- .../main/java/org/elasticsearch/index/translog/Translog.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 1462e5e5ed544..e0e2354e9e4f8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -127,6 +127,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; private final String translogUUID; + private final AtomicBoolean foldingGeneration = new AtomicBoolean(); /** * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is @@ -399,8 +400,6 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { return newFile; } - final AtomicBoolean foldingGeneration = new AtomicBoolean(); - /** * Adds an operation to the transaction log. * From bbab1268dc56410045eea0290bd57de318ae4377 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Mar 2017 17:15:14 -0400 Subject: [PATCH 08/23] Rename folding generation to rolling generation --- .../index/translog/Translog.java | 26 ++++++++--------- .../index/translog/TranslogTests.java | 28 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index e0e2354e9e4f8..8dbfd54a0fb4d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -127,7 +127,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; private final String translogUUID; - private final AtomicBoolean foldingGeneration = new AtomicBoolean(); + private final AtomicBoolean rollingGeneration = new AtomicBoolean(); /** * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is @@ -424,14 +424,14 @@ public Location add(final Operation operation) throws IOException { ensureOpen(); location = current.add(bytes, operation.seqNo()); } - if (shouldFoldGeneration() && foldingGeneration.compareAndSet(false, true)) { - // we have to check the condition again lest we could fold twice in a race - if (shouldFoldGeneration()) { + if (shouldRollGeneration() && rollingGeneration.compareAndSet(false, true)) { + // we have to check the condition again lest we could roll twice in a race + if (shouldRollGeneration()) { try (ReleasableLock ignored = writeLock.acquire()) { - this.foldGeneration(); + this.rollGeneration(); } - final boolean wasFoldingGeneration = foldingGeneration.getAndSet(false); - assert wasFoldingGeneration; + final boolean wasRolling = rollingGeneration.getAndSet(false); + assert wasRolling; } } return location; @@ -455,13 +455,13 @@ public Location add(final Operation operation) throws IOException { } /** - * Tests whether or not the current generation of the translog should be folded into a new + * Tests whether or not the current generation of the translog should be rolled into a new * generation. This test is based on the size of the current generation compared to the * configured generation threshold size. * - * @return {@code true} if the current generation should be folded into a new generation + * @return {@code true} if the current generation should be rolled into a new generation */ - private boolean shouldFoldGeneration() { + private boolean shouldRollGeneration() { final long size = this.current.sizeInBytes(); final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes(); return size > threshold; @@ -1348,12 +1348,12 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl } /** - * Fold the current translog generation into a new generation. This does not commit the + * Roll the current translog generation into a new generation. This does not commit the * translog. The translog write lock must be held by the current thread. * * @throws IOException if an I/O exception occurred during any file operations */ - void foldGeneration() throws IOException { + void rollGeneration() throws IOException { assert writeLock.isHeldByCurrentThread(); try { final TranslogReader reader = current.closeIntoReader(); @@ -1383,7 +1383,7 @@ public long prepareCommit() throws IOException { currentCommittingGeneration); } currentCommittingGeneration = current.getGeneration(); - foldGeneration(); + rollGeneration(); } return 0L; } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index f56a9879c4d3b..b345c0be29496 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2089,33 +2089,33 @@ public void testTranslogOpSerialization() throws Exception { assertEquals(delete, serializedDelete); } - public void testFoldGeneration() throws IOException { + public void testRollGeneration() throws IOException { final long generation = translog.currentFileGeneration(); - final int folds = randomIntBetween(1, 16); + final int rolls = randomIntBetween(1, 16); int totalOperations = 0; int seqNo = 0; - for (int i = 0; i < folds; i++) { + for (int i = 0; i < rolls; i++) { final int operations = randomIntBetween(1, 128); for (int j = 0; j < operations; j++) { translog.add(new Translog.NoOp(seqNo++, 0, "test")); totalOperations++; } try (ReleasableLock ignored = translog.writeLock.acquire()) { - translog.foldGeneration(); + translog.rollGeneration(); } assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1)); assertThat(translog.totalOperations(), equalTo(totalOperations)); } - for (int i = 0; i <= folds; i++) { + for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); } translog.commit(); - assertThat(translog.currentFileGeneration(), equalTo(generation + folds + 1)); + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1)); assertThat(translog.totalOperations(), equalTo(0)); - for (int i = 0; i <= folds; i++) { + for (int i = 0; i <= rolls; i++) { assertFileDeleted(translog, generation + i); } - assertFileIsPresent(translog, generation + folds + 1); + assertFileIsPresent(translog, generation + rolls + 1); } public void testGenerationThreshold() throws IOException { @@ -2126,24 +2126,24 @@ public void testGenerationThreshold() throws IOException { .put("index.translog.generation_threshold_size", generationThreshold + "b") .build(); long seqNo = 0; - long folds = 0; + long rolls = 0; final TranslogConfig config = getTranslogConfig(translogDir, settings); try (Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { final long generation = translog.currentFileGeneration(); for (int i = 0; i < randomIntBetween(32, 128); i++) { - assertThat(translog.currentFileGeneration(), equalTo(generation + folds)); + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); final Location location = translog.add(new Translog.NoOp(seqNo++, 0, "test")); if (location.translogLocation + location.size > generationThreshold) { - folds++; - assertThat(translog.currentFileGeneration(), equalTo(generation + folds)); - for (int j = 0; j < folds; j++) { + rolls++; + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + for (int j = 0; j < rolls; j++) { assertFileIsPresent(translog, generation + j); } } } - for (int j = 0; j < folds; j++) { + for (int j = 0; j < rolls; j++) { assertFileIsPresent(translog, generation + j); } } From 8f6b609ef4773bc591425e3dc1c91396389b6557 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Mar 2017 18:06:36 -0400 Subject: [PATCH 09/23] Stricter roll checking --- .../elasticsearch/index/translog/Translog.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 8dbfd54a0fb4d..7fda44071b44e 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -420,16 +420,25 @@ public Location add(final Operation operation) throws IOException { out.seek(end); final ReleasablePagedBytesReference bytes = out.bytes(); final Location location; + final boolean shouldRollGeneration; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); location = current.add(bytes, operation.seqNo()); + // check if we should roll under the read lock + shouldRollGeneration = + shouldRollGeneration() && rollingGeneration.compareAndSet(false, true); } - if (shouldRollGeneration() && rollingGeneration.compareAndSet(false, true)) { - // we have to check the condition again lest we could roll twice in a race - if (shouldRollGeneration()) { - try (ReleasableLock ignored = writeLock.acquire()) { + if (shouldRollGeneration) { + try (ReleasableLock ignored = writeLock.acquire()) { + /* + * We have to check the condition again lest we could roll twice if another + * thread committed the translog (which rolls the generation )between us + * releasing the read lock and acquiring the write lock. + */ + if (shouldRollGeneration()) { this.rollGeneration(); } + } finally { final boolean wasRolling = rollingGeneration.getAndSet(false); assert wasRolling; } From d9fbc4229ca405cdf62f9e61b503922b4f98ab53 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Mar 2017 18:24:23 -0400 Subject: [PATCH 10/23] Add test for rolling and committing --- .../index/translog/Translog.java | 17 +++++---- .../index/translog/TranslogTests.java | 36 +++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 7fda44071b44e..fbc4dd37e6dd3 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -55,6 +55,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -1388,26 +1389,30 @@ public long prepareCommit() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration != NOT_SET_GENERATION) { - throw new IllegalStateException("already committing a translog with generation: " + + final String message = String.format( + Locale.ROOT, + "already committing a translog with generation [%d]", currentCommittingGeneration); + throw new IllegalStateException(message); } currentCommittingGeneration = current.getGeneration(); rollGeneration(); } - return 0L; + return 0; } @Override public long commit() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { + try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); if (currentCommittingGeneration == NOT_SET_GENERATION) { prepareCommit(); } assert currentCommittingGeneration != NOT_SET_GENERATION; - assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent() - : "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]"; - lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up + assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) + : "readers missing committing generation [" + currentCommittingGeneration + "]"; + // set the last committed generation otherwise old files will not be cleaned up + lastCommittedTranslogFileGeneration = current.getGeneration(); currentCommittingGeneration = NOT_SET_GENERATION; trimUnreferencedReaders(); } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b345c0be29496..c666c3a8f4d5b 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2149,4 +2149,40 @@ public void testGenerationThreshold() throws IOException { } } + public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException { + final long generation = translog.currentFileGeneration(); + int seqNo = 0; + + final int operationsBefore = randomIntBetween(1, 256); + for (int i = 0; i < operationsBefore; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + translog.prepareCommit(); + assertThat(translog.currentFileGeneration(), equalTo(generation + 1)); + for (long g = generation; g <= generation + 1; g++) { + assertFileIsPresent(translog, g); + } + + final int operationsBetween = randomIntBetween(1, 256); + for (int i = 0; i < operationsBetween; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (ReleasableLock ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + assertThat(translog.currentFileGeneration(), equalTo(generation + 2)); + for (long g = generation; g <= generation + 2; g++) { + assertFileIsPresent(translog, g); + } + + translog.commit(); + + for (long g = generation; g < generation + 2; g++) { + assertFileDeleted(translog, g); + } + assertFileIsPresent(translog, generation + 2); + } + } From 9076d7998b0b9cee65874bc887c409de0d8cee85 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Mar 2017 18:29:28 -0400 Subject: [PATCH 11/23] Simplify tests --- .../index/translog/TranslogTests.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index c666c3a8f4d5b..500a9567d8779 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2160,9 +2160,8 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException translog.prepareCommit(); assertThat(translog.currentFileGeneration(), equalTo(generation + 1)); - for (long g = generation; g <= generation + 1; g++) { - assertFileIsPresent(translog, g); - } + assertFileIsPresent(translog, generation); + assertFileIsPresent(translog, generation + 1); final int operationsBetween = randomIntBetween(1, 256); for (int i = 0; i < operationsBetween; i++) { @@ -2173,15 +2172,14 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException translog.rollGeneration(); } assertThat(translog.currentFileGeneration(), equalTo(generation + 2)); - for (long g = generation; g <= generation + 2; g++) { - assertFileIsPresent(translog, g); - } + assertFileIsPresent(translog, generation); + assertFileIsPresent(translog, generation + 1); + assertFileIsPresent(translog, generation + 2); translog.commit(); - for (long g = generation; g < generation + 2; g++) { - assertFileDeleted(translog, g); - } + assertFileDeleted(translog, generation); + assertFileDeleted(translog, generation + 1); assertFileIsPresent(translog, generation + 2); } From b79053efe1f65d16814021e64cf0ad0675744e54 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Mar 2017 21:08:13 -0400 Subject: [PATCH 12/23] Fix typo --- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index fbc4dd37e6dd3..8153a057ec810 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -433,7 +433,7 @@ public Location add(final Operation operation) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { /* * We have to check the condition again lest we could roll twice if another - * thread committed the translog (which rolls the generation )between us + * thread committed the translog (which rolls the generation) between us * releasing the read lock and acquiring the write lock. */ if (shouldRollGeneration()) { From 166922475068ad63479dd77b10fa51e86fb2a7fe Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Mar 2017 08:06:27 -0400 Subject: [PATCH 13/23] Stronger test for precommit/roll/commit sequences --- .../index/translog/Translog.java | 2 +- .../index/translog/TranslogTests.java | 63 +++++++++++++------ 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 8153a057ec810..67829ac4736a2 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1412,7 +1412,7 @@ public long commit() throws IOException { assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) : "readers missing committing generation [" + currentCommittingGeneration + "]"; // set the last committed generation otherwise old files will not be cleaned up - lastCommittedTranslogFileGeneration = current.getGeneration(); + lastCommittedTranslogFileGeneration = currentCommittingGeneration; currentCommittingGeneration = NOT_SET_GENERATION; trimUnreferencedReaders(); } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 500a9567d8779..87f8b6bc4c9cb 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -102,6 +103,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.common.util.BigArrays.PAGE_SIZE_IN_BYTES; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -2153,34 +2155,59 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException final long generation = translog.currentFileGeneration(); int seqNo = 0; - final int operationsBefore = randomIntBetween(1, 256); - for (int i = 0; i < operationsBefore; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); + final int rollsBefore = randomIntBetween(0, 16); + for (int r = 1; r <= rollsBefore; r++) { + final int operationsBefore = randomIntBetween(1, 256); + for (int i = 0; i < operationsBefore; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (Releasable ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + + assertThat(translog.currentFileGeneration(), equalTo(generation + r)); + for (int i = 0; i <= r; i++) { + assertFileIsPresent(translog, generation + r); + } } + assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore)); translog.prepareCommit(); - assertThat(translog.currentFileGeneration(), equalTo(generation + 1)); - assertFileIsPresent(translog, generation); - assertFileIsPresent(translog, generation + 1); + assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1)); - final int operationsBetween = randomIntBetween(1, 256); - for (int i = 0; i < operationsBetween; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); + for (int i = 0; i <= rollsBefore + 1; i++) { + assertFileIsPresent(translog, generation + i); } - try (ReleasableLock ignored = translog.writeLock.acquire()) { - translog.rollGeneration(); + final int rollsBetween = randomIntBetween(0, 16); + for (int r = 1; r <= rollsBetween; r++) { + final int operationsBetween = randomIntBetween(1, 256); + for (int i = 0; i < operationsBetween; i++) { + translog.add(new Translog.NoOp(seqNo++, 0, "test")); + } + + try (Releasable ignored = translog.writeLock.acquire()) { + translog.rollGeneration(); + } + + assertThat( + translog.currentFileGeneration(), + equalTo(generation + rollsBefore + 1 + r)); + for (int i = 0; i <= rollsBefore + 1 + r; i++) { + assertFileIsPresent(translog, generation + i); + } } - assertThat(translog.currentFileGeneration(), equalTo(generation + 2)); - assertFileIsPresent(translog, generation); - assertFileIsPresent(translog, generation + 1); - assertFileIsPresent(translog, generation + 2); translog.commit(); - assertFileDeleted(translog, generation); - assertFileDeleted(translog, generation + 1); - assertFileIsPresent(translog, generation + 2); + for (int i = 0; i < rollsBefore; i++) { + assertFileDeleted(translog, generation + i); + } + for (int i = rollsBefore; i <= rollsBefore + 1 + rollsBetween; i++) { + assertFileIsPresent(translog, generation + i); + } + } } From 35d0119195405fb217e1df44a8ab15bf45667528 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Mar 2017 10:58:04 -0400 Subject: [PATCH 14/23] Add Javadocs to IndexSettings#getGenerationThresholdSize --- .../main/java/org/elasticsearch/index/IndexSettings.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 8c616ebe2ba63..599fea1823800 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -479,6 +479,15 @@ public TimeValue getGlobalCheckpointInterval() { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the generation threshold size. As sequence numbers can cause multiple generations to + * be preserved for rollback purposes, we want to keep the size of individual generations from + * growing too large to avoid excessive disk space consumption. Therefore, the translog is + * automatically rolled to a new generation when the current generation exceeds this generation + * threshold size. + * + * @return the generation threshold size + */ public ByteSizeValue getGenerationThresholdSize() { return generationThresholdSize; } From 32afd9eeab1c6e664485a88ff94927b06f1b9472 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Mar 2017 12:17:46 -0400 Subject: [PATCH 15/23] Add assert message --- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 67829ac4736a2..065ddc4740a9c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1364,7 +1364,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl * @throws IOException if an I/O exception occurred during any file operations */ void rollGeneration() throws IOException { - assert writeLock.isHeldByCurrentThread(); + assert writeLock.isHeldByCurrentThread() : "translog write lock not held by current thread"; try { final TranslogReader reader = current.closeIntoReader(); readers.add(reader); From dd14ba85cec849219bd8662dadedc166d8d456b6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Mar 2017 15:05:41 -0400 Subject: [PATCH 16/23] Migrate translog generation rolling to index shard --- .../replication/TransportWriteAction.java | 21 ++-- .../elasticsearch/index/shard/IndexShard.java | 110 +++++++++++++----- .../index/translog/Translog.java | 86 +++++++------- .../index/shard/IndexShardIT.java | 106 +++++++++++++---- .../index/translog/TranslogTests.java | 36 +----- 5 files changed, 214 insertions(+), 145 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 10f8741ecccb6..6cd3e5a7b3a03 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -22,16 +22,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; -import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -46,7 +42,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import org.apache.logging.log4j.core.pattern.ConverterKeys; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -302,15 +297,21 @@ private void maybeFinish() { } void run() { - // we either respond immediately ie. if we we don't fsync per request or wait for refresh - // OR we got an pass async operations on and wait for them to return to respond. - indexShard.maybeFlush(); - maybeFinish(); // decrement the pendingOpts by one, if there is nothing else to do we just respond with success. + /* + * We either respond immediately (i.e., if we do not fsync per request or wait for + * refresh), or we there are past async operations and we wait for them to return to + * respond. + */ + indexShard.maybeFlushOrRollTranslogGeneration(); + // decrement pending by one, if there is nothing else to do we just respond with success + maybeFinish(); if (waitUntilRefresh) { assert pendingOps.get() > 0; indexShard.addRefreshListener(location, forcedRefresh -> { if (forcedRefresh) { - logger.warn("block_until_refresh request ran out of slots and forced a refresh: [{}]", request); + logger.warn( + "block until refresh ran out of slots and forced a refresh: [{}]", + request); } refreshed.set(forcedRefresh); maybeFinish(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0e6054deccd0f..f6b7b3a73ac91 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -794,6 +794,11 @@ public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException } + private void rollTranslogGeneration() throws IOException { + final Engine engine = getEngine(); + engine.getTranslog().rollGeneration(); + } + public void forceMerge(ForceMergeRequest forceMerge) throws IOException { verifyActive(); if (logger.isTraceEnabled()) { @@ -1256,17 +1261,39 @@ public boolean restoreFromRepository(Repository repository) { } /** - * Returns true iff this shard needs to be flushed due to too many translog operation or a too large transaction log. - * Otherwise false. + * Tests whether or not the translog should be flushed. This test is based on the current size + * of the translog comparted to the configured flush threshold size. + * + * @return {@code true} if the translog should be flushed */ boolean shouldFlush() { - Engine engine = getEngineOrNull(); + final Engine engine = getEngineOrNull(); if (engine != null) { try { - Translog translog = engine.getTranslog(); - return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes(); - } catch (AlreadyClosedException ex) { - // that's fine we are already close - no need to flush + final Translog translog = engine.getTranslog(); + return translog.shouldFlush(); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to flush or roll + } + } + return false; + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. This test + * is based on the size of the current generation compared to the configured generation + * threshold size. + * + * @return {@code true} if the current generation should be rolled to a new generation + */ + boolean shouldRollTranslogGeneration() { + final Engine engine = getEngineOrNull(); + if (engine != null) { + try { + final Translog translog = engine.getTranslog(); + return translog.shouldRollGeneration(); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to flush or roll } } return false; @@ -1810,28 +1837,31 @@ public Translog.Durability getTranslogDurability() { return indexSettings.getTranslogDurability(); } - private final AtomicBoolean asyncFlushRunning = new AtomicBoolean(); + // we can not protect with a lock since we "release" on a different thread + private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); /** - * Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the - * Flush thread-pool asynchronously. - * - * @return true if a new flush is scheduled otherwise false. + * Schedules a flush or translog generation roll if needed but will not schedule more than one + * concurrently. The operation will be executed asynchronously on the flush thread pool. */ - public boolean maybeFlush() { - if (shouldFlush()) { - if (asyncFlushRunning.compareAndSet(false, true)) { // we can't use a lock here since we "release" in a different thread - if (shouldFlush() == false) { - // we have to check again since otherwise there is a race when a thread passes - // the first shouldFlush() check next to another thread which flushes fast enough - // to finish before the current thread could flip the asyncFlushRunning flag. - // in that situation we have an extra unexpected flush. - asyncFlushRunning.compareAndSet(true, false); - } else { + public void maybeFlushOrRollTranslogGeneration() { + if (shouldFlush() || shouldRollTranslogGeneration()) { + if (flushOrRollRunning.compareAndSet(false, true)) { + /* + * We have to check again since otherwise there is a race when a thread passes the + * first check next to another thread which performs the operation quickly enough to + * finish before the current thread could flip the flag. In that situation, we have + * an extra operation. + * + * Additionally, a flush implicitly executes a translog generation roll so if we + * execute a flush then we do not need to check if we should roll the translog + * generation. + */ + if (shouldFlush()) { logger.debug("submitting async flush request"); - final AbstractRunnable abstractRunnable = new AbstractRunnable() { + final AbstractRunnable flush = new AbstractRunnable() { @Override - public void onFailure(Exception e) { + public void onFailure(final Exception e) { if (state != IndexShardState.CLOSED) { logger.warn("failed to flush index", e); } @@ -1844,16 +1874,38 @@ protected void doRun() throws Exception { @Override public void onAfter() { - asyncFlushRunning.compareAndSet(true, false); - maybeFlush(); // fire a flush up again if we have filled up the limits such that shouldFlush() returns true + flushOrRollRunning.compareAndSet(true, false); + maybeFlushOrRollTranslogGeneration(); + } + }; + threadPool.executor(ThreadPool.Names.FLUSH).execute(flush); + } else if (shouldRollTranslogGeneration()) { + logger.debug("submitting async roll translog generation request"); + final AbstractRunnable roll = new AbstractRunnable() { + @Override + public void onFailure(final Exception e) { + if (state != IndexShardState.CLOSED) { + logger.warn("failed to roll translog generation", e); + } + } + + @Override + protected void doRun() throws Exception { + rollTranslogGeneration(); + } + + @Override + public void onAfter() { + flushOrRollRunning.compareAndSet(true, false); + maybeFlushOrRollTranslogGeneration(); } }; - threadPool.executor(ThreadPool.Names.FLUSH).execute(abstractRunnable); - return true; + threadPool.executor(ThreadPool.Names.FETCH_SHARD_STARTED).execute(roll); + } else { + flushOrRollRunning.compareAndSet(true, false); } } } - return false; } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 065ddc4740a9c..1a2e44d07eeab 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.util.BigArrays; @@ -420,31 +421,10 @@ public Location add(final Operation operation) throws IOException { out.writeInt(operationSize); out.seek(end); final ReleasablePagedBytesReference bytes = out.bytes(); - final Location location; - final boolean shouldRollGeneration; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - location = current.add(bytes, operation.seqNo()); - // check if we should roll under the read lock - shouldRollGeneration = - shouldRollGeneration() && rollingGeneration.compareAndSet(false, true); + return current.add(bytes, operation.seqNo()); } - if (shouldRollGeneration) { - try (ReleasableLock ignored = writeLock.acquire()) { - /* - * We have to check the condition again lest we could roll twice if another - * thread committed the translog (which rolls the generation) between us - * releasing the read lock and acquiring the write lock. - */ - if (shouldRollGeneration()) { - this.rollGeneration(); - } - } finally { - final boolean wasRolling = rollingGeneration.getAndSet(false); - assert wasRolling; - } - } - return location; } catch (final AlreadyClosedException | IOException ex) { try { closeOnTragicEvent(ex); @@ -465,13 +445,24 @@ public Location add(final Operation operation) throws IOException { } /** - * Tests whether or not the current generation of the translog should be rolled into a new - * generation. This test is based on the size of the current generation compared to the - * configured generation threshold size. + * Tests whether or not the translog should be flushed. This test is based on the current size + * of the translog comparted to the configured flush threshold size. + * + * @return {@code true} if the translog should be flushed + */ + public boolean shouldFlush() { + final long size = this.sizeInBytes(); + return size > this.indexSettings.getFlushThresholdSize().getBytes(); + } + + /** + * Tests whether or not the translog generation should be rolled to a new generation. This test + * is based on the size of the current generation compared to the configured generation + * threshold size. * - * @return {@code true} if the current generation should be rolled into a new generation + * @return {@code true} if the current generation should be rolled to a new generation */ - private boolean shouldRollGeneration() { + public boolean shouldRollGeneration() { final long size = this.current.sizeInBytes(); final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes(); return size > threshold; @@ -1359,28 +1350,29 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl /** * Roll the current translog generation into a new generation. This does not commit the - * translog. The translog write lock must be held by the current thread. + * translog. * * @throws IOException if an I/O exception occurred during any file operations */ - void rollGeneration() throws IOException { - assert writeLock.isHeldByCurrentThread() : "translog write lock not held by current thread"; - try { - final TranslogReader reader = current.closeIntoReader(); - readers.add(reader); - final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); - assert Checkpoint.read(checkpoint).generation == current.getGeneration(); - final Path generationCheckpoint = - location.resolve(getCommitCheckpointFileName(current.getGeneration())); - Files.copy(checkpoint, generationCheckpoint); - IOUtils.fsync(generationCheckpoint, false); - IOUtils.fsync(generationCheckpoint.getParent(), true); - // create a new translog file; this will sync it and update the checkpoint data; - current = createWriter(current.getGeneration() + 1); - logger.trace("current translog set to [{}]", current.getGeneration()); - } catch (final Exception e) { - IOUtils.closeWhileHandlingException(this); // tragic event - throw e; + public void rollGeneration() throws IOException { + try (Releasable ignored = writeLock.acquire()) { + try { + final TranslogReader reader = current.closeIntoReader(); + readers.add(reader); + final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); + assert Checkpoint.read(checkpoint).generation == current.getGeneration(); + final Path generationCheckpoint = + location.resolve(getCommitCheckpointFileName(current.getGeneration())); + Files.copy(checkpoint, generationCheckpoint); + IOUtils.fsync(generationCheckpoint, false); + IOUtils.fsync(generationCheckpoint.getParent(), true); + // create a new translog file; this will sync it and update the checkpoint data; + current = createWriter(current.getGeneration() + 1); + logger.trace("current translog set to [{}]", current.getGeneration()); + } catch (final Exception e) { + IOUtils.closeWhileHandlingException(this); // tragic event + throw e; + } } } @@ -1428,7 +1420,7 @@ void trimUnreferencedReaders() { long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE); minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); final long finalMinReferencedGen = minReferencedGen; - List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); + List unreferenced = readers.stream().filter(r -> r.getGeneration() <= finalMinReferencedGen).collect(Collectors.toList()); for (final TranslogReader unreferencedReader : unreferenced) { Path translogPath = unreferencedReader.path(); logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 97c96c8af12f7..093a6d124bd44 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -68,6 +68,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.nio.file.Files; @@ -363,49 +364,104 @@ public void testMaybeFlush() throws Exception { assertEquals(0, shard.getEngine().getTranslog().totalOperations()); } - public void testStressMaybeFlush() throws Exception { + public void testMaybeRollTranslogGeneration() throws Exception { + final int generationThreshold = randomIntBetween(1, 512); + final Settings settings = + Settings + .builder() + .put("index.number_of_shards", 1) + .put("index.translog.generation_threshold_size", generationThreshold + "b") + .put() + .build(); + createIndex("test", settings); + ensureGreen("test"); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + int rolls = 0; + final Translog translog = shard.getEngine().getTranslog(); + final long generation = translog.currentFileGeneration(); + for (int i = 0; i < randomIntBetween(32, 128); i++) { + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + final ParsedDocument doc = testParsedDocument( + "1", + "test", + null, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + new ParseContext.Document(), + new BytesArray(new byte[]{1}), XContentType.JSON, null); + final Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc); + final Engine.IndexResult result = shard.index(index); + final Translog.Location location = result.getTranslogLocation(); + shard.maybeFlushOrRollTranslogGeneration(); + if (location.translogLocation + location.size > generationThreshold) { + // wait until the roll completes + assertBusy(() -> assertFalse(shard.shouldRollTranslogGeneration())); + rolls++; + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); + } + } + } + + public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("test")); final IndexShard shard = test.getShardOrNull(0); assertFalse(shard.shouldFlush()); - client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( - IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(117/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0").setSource("{}", XContentType.JSON) - .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + final String key; + final boolean flush = randomBoolean(); + if (flush) { + key = "index.translog.flush_threshold_size"; + } else { + key = "index.translog.generation_threshold_size"; + } + // size of the operation plus header and footer + final Settings settings = Settings.builder().put(key, "117b").build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get(); + client().prepareIndex("test", "test", "0") + .setSource("{}", XContentType.JSON) + .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE) + .get(); assertFalse(shard.shouldFlush()); final AtomicBoolean running = new AtomicBoolean(true); final int numThreads = randomIntBetween(2, 4); - Thread[] threads = new Thread[numThreads]; - CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + final Thread[] threads = new Thread[numThreads]; + final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - while (running.get()) { - shard.maybeFlush(); - } + threads[i] = new Thread(() -> { + try { + barrier.await(); + } catch (final InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + while (running.get()) { + shard.maybeFlushOrRollTranslogGeneration(); } - }; + }); threads[i].start(); } barrier.await(); - FlushStats flushStats = shard.flushStats(); - long total = flushStats.getTotal(); - client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - assertBusy(() -> assertEquals(total + 1, shard.flushStats().getTotal())); + final Runnable check; + if (flush) { + final FlushStats flushStats = shard.flushStats(); + final long total = flushStats.getTotal(); + client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); + check = () -> assertEquals(total + 1, shard.flushStats().getTotal()); + } else { + final long generation = shard.getEngine().getTranslog().currentFileGeneration(); + client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); + check = () -> assertEquals( + generation + 1, + shard.getEngine().getTranslog().currentFileGeneration()); + } + assertBusy(check); running.set(false); for (int i = 0; i < threads.length; i++) { threads[i].join(); } - assertEquals(total + 1, shard.flushStats().getTotal()); + check.run(); } public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 87f8b6bc4c9cb..36401deed4bc2 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -103,7 +103,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; -import static org.elasticsearch.common.util.BigArrays.PAGE_SIZE_IN_BYTES; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -2120,37 +2119,6 @@ public void testRollGeneration() throws IOException { assertFileIsPresent(translog, generation + rolls + 1); } - public void testGenerationThreshold() throws IOException { - translog.close(); - final int generationThreshold = randomIntBetween(1, 512); - final Settings settings = Settings - .builder() - .put("index.translog.generation_threshold_size", generationThreshold + "b") - .build(); - long seqNo = 0; - long rolls = 0; - final TranslogConfig config = getTranslogConfig(translogDir, settings); - try (Translog translog = - new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - final long generation = translog.currentFileGeneration(); - for (int i = 0; i < randomIntBetween(32, 128); i++) { - assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - final Location location = translog.add(new Translog.NoOp(seqNo++, 0, "test")); - if (location.translogLocation + location.size > generationThreshold) { - rolls++; - assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - for (int j = 0; j < rolls; j++) { - assertFileIsPresent(translog, generation + j); - } - } - } - - for (int j = 0; j < rolls; j++) { - assertFileIsPresent(translog, generation + j); - } - } - } - public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException { final long generation = translog.currentFileGeneration(); int seqNo = 0; @@ -2201,10 +2169,10 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException translog.commit(); - for (int i = 0; i < rollsBefore; i++) { + for (int i = 0; i <= rollsBefore; i++) { assertFileDeleted(translog, generation + i); } - for (int i = rollsBefore; i <= rollsBefore + 1 + rollsBetween; i++) { + for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) { assertFileIsPresent(translog, generation + i); } From f59233c1712146aa6cfafe1706ba07d5a417da0b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Mar 2017 16:15:05 -0400 Subject: [PATCH 17/23] Add Javadocs and tidy up --- .../elasticsearch/index/shard/IndexShard.java | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f6b7b3a73ac91..edabdb3490a94 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -771,29 +771,41 @@ public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expecte return engine.syncFlush(syncId, expectedCommitId); } - public Engine.CommitId flush(FlushRequest request) throws ElasticsearchException { - boolean waitIfOngoing = request.waitIfOngoing(); - boolean force = request.force(); - if (logger.isTraceEnabled()) { - logger.trace("flush with {}", request); - } - // we allows flush while recovering, since we allow for operations to happen - // while recovering, and we want to keep the translog at bay (up to deletes, which - // we don't gc). Yet, we don't use flush internally to clear deletes and flush the indexwriter since - // we use #writeIndexingBuffer for this now. + /** + * Executes the given flush request against the engine. + * + * @param request the flush request + * @return the commit ID + */ + public Engine.CommitId flush(FlushRequest request) { + final boolean waitIfOngoing = request.waitIfOngoing(); + final boolean force = request.force(); + logger.trace("flush with {}", request); + /* + * We allow flushes while recovery since we allow operations to happen while recovering and + * we want to keep the translog under control (up to deletes, which we do not GC). Yet, we + * do not use flush internally to clear deletes and flush the index writer since we use + * Engine#writeIndexingBuffer for this now. + */ verifyNotClosed(); - Engine engine = getEngine(); + final Engine engine = getEngine(); if (engine.isRecovering()) { - throw new IllegalIndexShardStateException(shardId(), state, "flush is only allowed if the engine is not recovery" + - " from translog"); + throw new IllegalIndexShardStateException( + shardId(), + state, + "flush is only allowed if the engine is not recovery from translog"); } - long time = System.nanoTime(); - Engine.CommitId commitId = engine.flush(force, waitIfOngoing); + final long time = System.nanoTime(); + final Engine.CommitId commitId = engine.flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); return commitId; - } + /** + * Rolls the tranlog generation. + * + * @throws IOException if any file operations on the translog throw an I/O exception + */ private void rollTranslogGeneration() throws IOException { final Engine engine = getEngine(); engine.getTranslog().rollGeneration(); From 192a7cee23d622f7096cee8109f19643b4234f51 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Mar 2017 20:35:40 -0400 Subject: [PATCH 18/23] Funny business --- .../org/elasticsearch/index/translog/Translog.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 1a2e44d07eeab..42cd315d19fb8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -358,7 +358,7 @@ private int totalOperations(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) - .filter(r -> r.getGeneration() >= minGeneration) + .filter(r -> r.getGeneration() > minGeneration) .mapToInt(BaseTranslogReader::totalOperations) .sum(); } @@ -371,7 +371,7 @@ private long sizeInBytes(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) - .filter(r -> r.getGeneration() >= minGeneration) + .filter(r -> r.getGeneration() > minGeneration) .mapToLong(BaseTranslogReader::sizeInBytes) .sum(); } @@ -519,7 +519,7 @@ private Snapshot createSnapshot(long minGeneration) { public Translog.View newView() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - View view = new View(lastCommittedTranslogFileGeneration); + View view = new View(lastCommittedTranslogFileGeneration + 1); outstandingViews.add(view); return view; } @@ -1418,9 +1418,9 @@ void trimUnreferencedReaders() { return; } long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE); - minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); + minReferencedGen = Math.min(lastCommittedTranslogFileGeneration + 1, minReferencedGen); final long finalMinReferencedGen = minReferencedGen; - List unreferenced = readers.stream().filter(r -> r.getGeneration() <= finalMinReferencedGen).collect(Collectors.toList()); + List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); for (final TranslogReader unreferencedReader : unreferenced) { Path translogPath = unreferencedReader.path(); logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); From 6c2adb6c62848a0f0e8aebe96e23b4555203829b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Mar 2017 21:00:21 -0400 Subject: [PATCH 19/23] Revert "Funny business" This reverts commit 192a7cee23d622f7096cee8109f19643b4234f51. --- .../org/elasticsearch/index/translog/Translog.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 42cd315d19fb8..1a2e44d07eeab 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -358,7 +358,7 @@ private int totalOperations(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) - .filter(r -> r.getGeneration() > minGeneration) + .filter(r -> r.getGeneration() >= minGeneration) .mapToInt(BaseTranslogReader::totalOperations) .sum(); } @@ -371,7 +371,7 @@ private long sizeInBytes(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) - .filter(r -> r.getGeneration() > minGeneration) + .filter(r -> r.getGeneration() >= minGeneration) .mapToLong(BaseTranslogReader::sizeInBytes) .sum(); } @@ -519,7 +519,7 @@ private Snapshot createSnapshot(long minGeneration) { public Translog.View newView() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - View view = new View(lastCommittedTranslogFileGeneration + 1); + View view = new View(lastCommittedTranslogFileGeneration); outstandingViews.add(view); return view; } @@ -1418,9 +1418,9 @@ void trimUnreferencedReaders() { return; } long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE); - minReferencedGen = Math.min(lastCommittedTranslogFileGeneration + 1, minReferencedGen); + minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); final long finalMinReferencedGen = minReferencedGen; - List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); + List unreferenced = readers.stream().filter(r -> r.getGeneration() <= finalMinReferencedGen).collect(Collectors.toList()); for (final TranslogReader unreferencedReader : unreferenced) { Path translogPath = unreferencedReader.path(); logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); From 1a31061fd69b2583ea8ec4934fe46de533a4ba04 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Mar 2017 21:05:23 -0400 Subject: [PATCH 20/23] Committed generation --- .../main/java/org/elasticsearch/index/translog/Translog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 1a2e44d07eeab..56cd4e2943628 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1404,7 +1404,7 @@ public long commit() throws IOException { assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration) : "readers missing committing generation [" + currentCommittingGeneration + "]"; // set the last committed generation otherwise old files will not be cleaned up - lastCommittedTranslogFileGeneration = currentCommittingGeneration; + lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1; currentCommittingGeneration = NOT_SET_GENERATION; trimUnreferencedReaders(); } @@ -1420,7 +1420,7 @@ void trimUnreferencedReaders() { long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE); minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); final long finalMinReferencedGen = minReferencedGen; - List unreferenced = readers.stream().filter(r -> r.getGeneration() <= finalMinReferencedGen).collect(Collectors.toList()); + List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); for (final TranslogReader unreferencedReader : unreferenced) { Path translogPath = unreferencedReader.path(); logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); From 07fc092eaae4ce4e1443de3233912ef7ad53532f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Mar 2017 21:07:18 -0400 Subject: [PATCH 21/23] Remove unneeded field --- .../src/main/java/org/elasticsearch/index/translog/Translog.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 56cd4e2943628..d9a8cc408f822 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -129,7 +129,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; private final String translogUUID; - private final AtomicBoolean rollingGeneration = new AtomicBoolean(); /** * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is From 2e7f7220aa3463df99d410c2ca55070705d053a2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 27 Mar 2017 15:43:00 -0400 Subject: [PATCH 22/23] Fix thread pool name --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index edabdb3490a94..7c28b04034646 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1912,7 +1912,7 @@ public void onAfter() { maybeFlushOrRollTranslogGeneration(); } }; - threadPool.executor(ThreadPool.Names.FETCH_SHARD_STARTED).execute(roll); + threadPool.executor(ThreadPool.Names.FLUSH).execute(roll); } else { flushOrRollRunning.compareAndSet(true, false); } From a14937da3b370237a99fe94410871a23f88b2a7c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 27 Mar 2017 15:46:00 -0400 Subject: [PATCH 23/23] Change method name --- .../action/support/replication/TransportWriteAction.java | 2 +- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 +++--- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 6cd3e5a7b3a03..ae4ae78c03386 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -302,7 +302,7 @@ void run() { * refresh), or we there are past async operations and we wait for them to return to * respond. */ - indexShard.maybeFlushOrRollTranslogGeneration(); + indexShard.afterWriteOperation(); // decrement pending by one, if there is nothing else to do we just respond with success maybeFinish(); if (waitUntilRefresh) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7c28b04034646..32d3d4d4bf8ee 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1856,7 +1856,7 @@ public Translog.Durability getTranslogDurability() { * Schedules a flush or translog generation roll if needed but will not schedule more than one * concurrently. The operation will be executed asynchronously on the flush thread pool. */ - public void maybeFlushOrRollTranslogGeneration() { + public void afterWriteOperation() { if (shouldFlush() || shouldRollTranslogGeneration()) { if (flushOrRollRunning.compareAndSet(false, true)) { /* @@ -1887,7 +1887,7 @@ protected void doRun() throws Exception { @Override public void onAfter() { flushOrRollRunning.compareAndSet(true, false); - maybeFlushOrRollTranslogGeneration(); + afterWriteOperation(); } }; threadPool.executor(ThreadPool.Names.FLUSH).execute(flush); @@ -1909,7 +1909,7 @@ protected void doRun() throws Exception { @Override public void onAfter() { flushOrRollRunning.compareAndSet(true, false); - maybeFlushOrRollTranslogGeneration(); + afterWriteOperation(); } }; threadPool.executor(ThreadPool.Names.FLUSH).execute(roll); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 093a6d124bd44..ff5556089d28d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -68,7 +68,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; import java.nio.file.Files; @@ -393,7 +392,7 @@ public void testMaybeRollTranslogGeneration() throws Exception { final Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc); final Engine.IndexResult result = shard.index(index); final Translog.Location location = result.getTranslogLocation(); - shard.maybeFlushOrRollTranslogGeneration(); + shard.afterWriteOperation(); if (location.translogLocation + location.size > generationThreshold) { // wait until the roll completes assertBusy(() -> assertFalse(shard.shouldRollTranslogGeneration())); @@ -437,7 +436,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { throw new RuntimeException(e); } while (running.get()) { - shard.maybeFlushOrRollTranslogGeneration(); + shard.afterWriteOperation(); } }); threads[i].start();