Skip to content

Commit cabd994

Browse files
committed
Introduce translog generation folding
This commit introduces a maximum size for a translog generation and automatically folds the translog when a generation exceeds the threshold into a new generation. This threshold is configurable per index and defaults to sixty-four megabytes. We introduce this constraint as sequence numbers will require keeping around more than the current generation (to ensure that we can rollback to the global checkpoint). Without keeping the size of generations under control, having to keep old generations around could consume excessive disk space. A follow-up will enable commits to trim previous generations based on the global checkpoint.
1 parent 8e04561 commit cabd994

File tree

6 files changed

+202
-29
lines changed

6 files changed

+202
-29
lines changed

core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
125125
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
126126
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
127127
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
128+
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
128129
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
129130
FieldMapper.IGNORE_MALFORMED_SETTING,
130131
FieldMapper.COERCE_SETTING,

core/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.lucene.index.MergePolicy;
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.cluster.metadata.IndexMetaData;
25-
import org.elasticsearch.common.logging.DeprecationLogger;
2625
import org.elasticsearch.common.logging.Loggers;
2726
import org.elasticsearch.common.settings.IndexScopedSettings;
2827
import org.elasticsearch.common.settings.Setting;
@@ -112,6 +111,16 @@ public final class IndexSettings {
112111
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
113112
Property.IndexScope);
114113

114+
/**
115+
* The maximum size of a translog generation. This is independent of the maximum size of
116+
* translog operations that have not been flushed.
117+
*/
118+
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING =
119+
Setting.byteSizeSetting(
120+
"index.translog.generation_threshold_size",
121+
new ByteSizeValue(64, ByteSizeUnit.MB),
122+
new Property[]{Property.Dynamic, Property.IndexScope});
123+
115124
public static final Setting<TimeValue> INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL =
116125
Setting.timeSetting("index.seq_no.checkpoint_sync_interval", new TimeValue(30, TimeUnit.SECONDS),
117126
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);
@@ -156,6 +165,7 @@ public final class IndexSettings {
156165
private volatile TimeValue refreshInterval;
157166
private final TimeValue globalCheckpointInterval;
158167
private volatile ByteSizeValue flushThresholdSize;
168+
private volatile ByteSizeValue generationThresholdSize;
159169
private final MergeSchedulerConfig mergeSchedulerConfig;
160170
private final MergePolicyConfig mergePolicyConfig;
161171
private final IndexScopedSettings scopedSettings;
@@ -250,6 +260,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
250260
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
251261
globalCheckpointInterval = scopedSettings.get(INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL);
252262
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
263+
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
253264
mergeSchedulerConfig = new MergeSchedulerConfig(this);
254265
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
255266
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
@@ -281,6 +292,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
281292
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
282293
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
283294
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, this::setTranslogFlushThresholdSize);
295+
scopedSettings.addSettingsUpdateConsumer(
296+
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
297+
this::setGenerationThresholdSize);
284298
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
285299
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
286300
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
@@ -290,6 +304,10 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
290304
this.flushThresholdSize = byteSizeValue;
291305
}
292306

307+
private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
308+
this.generationThresholdSize = generationThresholdSize;
309+
}
310+
293311
private void setGCDeletes(TimeValue timeValue) {
294312
this.gcDeletesInMillis = timeValue.getMillis();
295313
}
@@ -461,6 +479,10 @@ public TimeValue getGlobalCheckpointInterval() {
461479
*/
462480
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }
463481

482+
public ByteSizeValue getGenerationThresholdSize() {
483+
return generationThresholdSize;
484+
}
485+
464486
/**
465487
* Returns the {@link MergeSchedulerConfig}
466488
*/

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ public Path location() {
329329
* Returns the generation of the current transaction log.
330330
*/
331331
public long currentFileGeneration() {
332-
try (ReleasableLock lock = readLock.acquire()) {
332+
try (ReleasableLock ignored = readLock.acquire()) {
333333
return current.getGeneration();
334334
}
335335
}
@@ -399,6 +399,8 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
399399
return newFile;
400400
}
401401

402+
final AtomicBoolean foldingGeneration = new AtomicBoolean();
403+
402404
/**
403405
* Adds an operation to the transaction log.
404406
*
@@ -409,20 +411,31 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
409411
public Location add(final Operation operation) throws IOException {
410412
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
411413
try {
412-
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
413414
final long start = out.position();
414415
out.skip(Integer.BYTES);
415-
writeOperationNoSize(checksumStreamOutput, operation);
416+
writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
416417
final long end = out.position();
417418
final int operationSize = (int) (end - Integer.BYTES - start);
418419
out.seek(start);
419420
out.writeInt(operationSize);
420421
out.seek(end);
421422
final ReleasablePagedBytesReference bytes = out.bytes();
423+
final Location location;
422424
try (ReleasableLock ignored = readLock.acquire()) {
423425
ensureOpen();
424-
return current.add(bytes, operation.seqNo());
426+
location = current.add(bytes, operation.seqNo());
425427
}
428+
try (ReleasableLock ignored = writeLock.acquire()) {
429+
if (shouldFoldGeneration(this) && foldingGeneration.compareAndSet(false, true)) {
430+
// we have to check the condition again lest we could fold twice in a race
431+
if (shouldFoldGeneration(this)) {
432+
this.foldGeneration(current.getGeneration());
433+
}
434+
final boolean wasFoldingGeneration = foldingGeneration.getAndSet(false);
435+
assert wasFoldingGeneration;
436+
}
437+
}
438+
return location;
426439
} catch (final AlreadyClosedException | IOException ex) {
427440
try {
428441
closeOnTragicEvent(ex);
@@ -442,6 +455,20 @@ public Location add(final Operation operation) throws IOException {
442455
}
443456
}
444457

458+
/**
459+
* Tests whether or not the current generation of the translog should be folded into a new
460+
* generation. This test is based on the size of the current generation compared to the
461+
* configured generation threshold size.
462+
*
463+
* @param translog the translog
464+
* @return {@code true} if the current generation should be folded into a new generation
465+
*/
466+
private static boolean shouldFoldGeneration(final Translog translog) {
467+
final long size = translog.current.sizeInBytes();
468+
final long threshold = translog.indexSettings.getGenerationThresholdSize().getBytes();
469+
return size > threshold;
470+
}
471+
445472
/**
446473
* The a {@linkplain Location} that will sort after the {@linkplain Location} returned by the last write but before any locations which
447474
* can be returned by the next write.
@@ -1322,27 +1349,46 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
13221349
out.writeInt((int) checksum);
13231350
}
13241351

1352+
/**
1353+
* Fold the current translog generation into a new generation. This does not commit the
1354+
* translog. The translog write lock must be held by the current thread.
1355+
*
1356+
* @param generation the current translog generation
1357+
* @throws IOException if an I/O exception occurred during any file operations
1358+
*/
1359+
void foldGeneration(final long generation) throws IOException {
1360+
assert writeLock.isHeldByCurrentThread();
1361+
try {
1362+
final TranslogReader reader = current.closeIntoReader();
1363+
readers.add(reader);
1364+
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
1365+
assert Checkpoint.read(checkpoint).generation == generation;
1366+
final Path generationCheckpoint =
1367+
location.resolve(getCommitCheckpointFileName(generation));
1368+
Files.copy(checkpoint, generationCheckpoint);
1369+
IOUtils.fsync(generationCheckpoint, false);
1370+
IOUtils.fsync(generationCheckpoint.getParent(), true);
1371+
// create a new translog file; this will sync it and update the checkpoint data;
1372+
current = createWriter(generation + 1);
1373+
logger.trace("current translog set to [{}]", current.getGeneration());
1374+
} catch (final Exception e) {
1375+
IOUtils.closeWhileHandlingException(this); // tragic event
1376+
throw e;
1377+
}
1378+
}
1379+
13251380
@Override
13261381
public long prepareCommit() throws IOException {
1327-
try (ReleasableLock lock = writeLock.acquire()) {
1382+
try (ReleasableLock ignored = writeLock.acquire()) {
13281383
ensureOpen();
13291384
if (currentCommittingGeneration != NOT_SET_GENERATION) {
1330-
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration);
1385+
throw new IllegalStateException("already committing a translog with generation: " +
1386+
currentCommittingGeneration);
13311387
}
1332-
currentCommittingGeneration = current.getGeneration();
1333-
TranslogReader currentCommittingTranslog = current.closeIntoReader();
1334-
readers.add(currentCommittingTranslog);
1335-
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
1336-
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
1337-
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration()));
1338-
Files.copy(checkpoint, commitCheckpoint);
1339-
IOUtils.fsync(commitCheckpoint, false);
1340-
IOUtils.fsync(commitCheckpoint.getParent(), true);
1341-
// create a new translog file - this will sync it and update the checkpoint data;
1342-
current = createWriter(current.getGeneration() + 1);
1343-
logger.trace("current translog set to [{}]", current.getGeneration());
1344-
1345-
} catch (Exception e) {
1388+
final long generation = current.getGeneration();
1389+
currentCommittingGeneration = generation;
1390+
foldGeneration(generation);
1391+
} catch (final Exception e) {
13461392
IOUtils.closeWhileHandlingException(this); // tragic event
13471393
throw e;
13481394
}

core/src/test/java/org/elasticsearch/index/IndexSettingsTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,27 @@ public void testTranslogFlushSizeThreshold() {
370370
assertEquals(actualNewTranslogFlushThresholdSize, settings.getFlushThresholdSize());
371371
}
372372

373+
public void testTranslogGenerationSizeThreshold() {
374+
final ByteSizeValue size = new ByteSizeValue(Math.abs(randomInt()));
375+
final String key = IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey();
376+
final ByteSizeValue actualValue =
377+
ByteSizeValue.parseBytesSizeValue(size.toString(), key);
378+
final IndexMetaData metaData =
379+
newIndexMeta(
380+
"index",
381+
Settings.builder()
382+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
383+
.put(key, size.toString())
384+
.build());
385+
final IndexSettings settings = new IndexSettings(metaData, Settings.EMPTY);
386+
assertEquals(actualValue, settings.getGenerationThresholdSize());
387+
final ByteSizeValue newSize = new ByteSizeValue(Math.abs(randomInt()));
388+
final ByteSizeValue actual = ByteSizeValue.parseBytesSizeValue(newSize.toString(), key);
389+
settings.updateIndexMetaData(
390+
newIndexMeta("index", Settings.builder().put(key, newSize.toString()).build()));
391+
assertEquals(actual, settings.getGenerationThresholdSize());
392+
}
393+
373394
public void testArchiveBrokenIndexSettings() {
374395
Settings settings =
375396
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS.archiveUnknownOrInvalidSettings(

core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.index.translog;
2121

22+
import com.carrotsearch.randomizedtesting.annotations.Repeat;
2223
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
2324
import org.apache.logging.log4j.message.ParameterizedMessage;
2425
import org.apache.logging.log4j.util.Supplier;
@@ -44,13 +45,14 @@
4445
import org.elasticsearch.common.settings.Settings;
4546
import org.elasticsearch.common.unit.ByteSizeUnit;
4647
import org.elasticsearch.common.unit.ByteSizeValue;
47-
import org.elasticsearch.common.util.BigArrays;
4848
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4949
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
50+
import org.elasticsearch.common.util.concurrent.ReleasableLock;
5051
import org.elasticsearch.common.xcontent.ToXContent;
5152
import org.elasticsearch.common.xcontent.XContentBuilder;
5253
import org.elasticsearch.common.xcontent.XContentFactory;
5354
import org.elasticsearch.common.xcontent.XContentType;
55+
import org.elasticsearch.index.IndexSettings;
5456
import org.elasticsearch.index.VersionType;
5557
import org.elasticsearch.index.engine.Engine;
5658
import org.elasticsearch.index.engine.Engine.Operation.Origin;
@@ -100,6 +102,7 @@
100102
import java.util.concurrent.atomic.AtomicReference;
101103
import java.util.stream.Collectors;
102104

105+
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
103106
import static org.hamcrest.Matchers.containsString;
104107
import static org.hamcrest.Matchers.equalTo;
105108
import static org.hamcrest.Matchers.greaterThan;
@@ -156,12 +159,25 @@ private Translog create(Path path) throws IOException {
156159
return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get());
157160
}
158161

159-
private TranslogConfig getTranslogConfig(Path path) {
160-
Settings build = Settings.builder()
161-
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
162-
.build();
163-
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
164-
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
162+
private TranslogConfig getTranslogConfig(final Path path) {
163+
final Settings settings = Settings
164+
.builder()
165+
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
166+
.build();
167+
return getTranslogConfig(path, settings);
168+
}
169+
170+
private TranslogConfig getTranslogConfig(final Path path, final Settings settings) {
171+
final ByteSizeValue bufferSize;
172+
if (randomBoolean()) {
173+
bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE;
174+
} else {
175+
bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
176+
}
177+
178+
final IndexSettings indexSettings =
179+
IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
180+
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize);
165181
}
166182

167183
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
@@ -2073,4 +2089,65 @@ public void testTranslogOpSerialization() throws Exception {
20732089
Translog.Delete serializedDelete = new Translog.Delete(in);
20742090
assertEquals(delete, serializedDelete);
20752091
}
2092+
2093+
public void testFoldGeneration() throws IOException {
2094+
final long generation = translog.currentFileGeneration();
2095+
final int folds = randomIntBetween(1, 16);
2096+
int totalOperations = 0;
2097+
int seqNo = 0;
2098+
for (int i = 0; i < folds; i++) {
2099+
final int operations = randomIntBetween(1, 128);
2100+
for (int j = 0; j < operations; j++) {
2101+
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
2102+
totalOperations++;
2103+
}
2104+
try (ReleasableLock ignored = translog.writeLock.acquire()) {
2105+
translog.foldGeneration(generation + i);
2106+
}
2107+
assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1));
2108+
assertThat(translog.totalOperations(), equalTo(totalOperations));
2109+
}
2110+
for (int i = 0; i <= folds; i++) {
2111+
assertFileIsPresent(translog, generation + i);
2112+
}
2113+
translog.commit();
2114+
assertThat(translog.currentFileGeneration(), equalTo(generation + folds + 1));
2115+
assertThat(translog.totalOperations(), equalTo(0));
2116+
for (int i = 0; i <= folds; i++) {
2117+
assertFileDeleted(translog, generation + i);
2118+
}
2119+
assertFileIsPresent(translog, generation + folds + 1);
2120+
}
2121+
2122+
public void testGenerationThreshold() throws IOException {
2123+
translog.close();
2124+
final int generationThreshold = randomIntBetween(1, 512);
2125+
final Settings settings = Settings
2126+
.builder()
2127+
.put("index.translog.generation_threshold_size", generationThreshold + "b")
2128+
.build();
2129+
long seqNo = 0;
2130+
long folds = 0;
2131+
final TranslogConfig config = getTranslogConfig(translogDir, settings);
2132+
try (Translog translog =
2133+
new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
2134+
final long generation = translog.currentFileGeneration();
2135+
for (int i = 0; i < randomIntBetween(32, 128); i++) {
2136+
assertThat(translog.currentFileGeneration(), equalTo(generation + folds));
2137+
final Location location = translog.add(new Translog.NoOp(seqNo++, 0, "test"));
2138+
if (location.translogLocation + location.size > generationThreshold) {
2139+
folds++;
2140+
assertThat(translog.currentFileGeneration(), equalTo(generation + folds));
2141+
for (int j = 0; j < folds; j++) {
2142+
assertFileIsPresent(translog, generation + j);
2143+
}
2144+
}
2145+
}
2146+
2147+
for (int j = 0; j < folds; j++) {
2148+
assertFileIsPresent(translog, generation + j);
2149+
}
2150+
}
2151+
}
2152+
20762153
}

0 commit comments

Comments
 (0)