Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6d1a93b
initial change + translog tests pass
bleskes Jun 18, 2017
2428bb1
add stats to distinguish between committed and uncommitted
bleskes Jun 18, 2017
cb17e83
lint
bleskes Jun 18, 2017
e1a544b
back to int
bleskes Jun 18, 2017
00f8ae3
fix tests
bleskes Jun 18, 2017
889492a
fix testReuseInFileBasedPeerRecovery
bleskes Jun 18, 2017
d2f70d4
fix rest test
bleskes Jun 18, 2017
7962d6b
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 18, 2017
68322d5
another fix to 20_translog.yml
bleskes Jun 18, 2017
8170c2a
fix testRecoveryAfterPrimaryPromotion
bleskes Jun 18, 2017
be59059
control retention in testStats
bleskes Jun 19, 2017
6527bf8
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 19, 2017
286d4a3
add time based trimming
bleskes Jun 19, 2017
3411e25
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 19, 2017
7602e53
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 20, 2017
be0192e
some docs and a migration note
bleskes Jun 20, 2017
925b29d
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 20, 2017
202e6f8
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 21, 2017
30004a1
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 21, 2017
36e0a4e
Revert "add time based trimming"
bleskes Jun 21, 2017
df0fb63
inline rolling and trimming into engine
bleskes Jun 21, 2017
1042768
feedback
bleskes Jun 21, 2017
f77ac5b
javadocs
bleskes Jun 21, 2017
9c56729
roll back computeIfAbsent
bleskes Jun 21, 2017
375376b
a different approach
bleskes Jun 21, 2017
18b7fee
jason's feedback
bleskes Jun 21, 2017
976df1c
Merge remote-tracking branch 'upstream/master' into translog_retentio…
bleskes Jun 22, 2017
d9ca199
fix compilation
bleskes Jun 22, 2017
a924d30
fix testSyncerSendsOffCorrectDocuments
bleskes Jun 22, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public final class IndexSettings {
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(12), TimeValue.timeValueMillis(-1), Property.Dynamic,
Property.IndexScope);

/**
Expand All @@ -127,7 +127,7 @@ public final class IndexSettings {
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,12 @@ public final boolean refreshNeeded() {
*/
public abstract CommitId flush() throws EngineException;

/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;


/**
* Force merges to 1 segment
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,7 @@ final boolean tryRenewSyncCommit() {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
Expand Down Expand Up @@ -1317,6 +1317,25 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
return new CommitId(newCommitId);
}

@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to roll translog", e);
}
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

Expand Down
10 changes: 4 additions & 6 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -921,13 +921,11 @@ public Engine.CommitId flush(FlushRequest request) {
}

/**
* Rolls the tranlog generation.
*
* @throws IOException if any file operations on the translog throw an I/O exception
* Rolls the tranlog generation and cleans unneeded.
*/
private void rollTranslogGeneration() throws IOException {
private void rollTranslogGeneration() {
final Engine engine = getEngine();
engine.getTranslog().rollGeneration();
engine.rollTranslogGeneration();
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
Expand Down Expand Up @@ -2142,7 +2140,7 @@ public void onFailure(final Exception e) {
}

@Override
protected void doRun() throws IOException {
protected void doRun() throws Exception {
rollTranslogGeneration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests

public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
try (Translog.View view = indexShard.acquireTranslogView()) {
Translog.Snapshot snapshot = view.snapshot();
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
ShardId shardId = indexShard.shardId();

// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
Expand All @@ -104,7 +105,7 @@ public synchronized Translog.Operation next() throws IOException {
};

resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
indexShard.getGlobalCheckpoint() + 1, listener);
startingSeqNo, listener);
}
}

Expand Down
117 changes: 90 additions & 27 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,31 @@ long getMinFileGeneration() {


/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
* Returns the number of operations in the translog files that aren't committed to lucene.
*/
public int totalOperations() {
public int uncommittedOperations() {
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
}

/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long uncommittedSizeInBytes() {
return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
}

/**
* Returns the number of operations in the translog files
*/
public int totalOperations() {
return totalOperations(-1);
}

/**
* Returns the size in bytes of the v files
*/
public long sizeInBytes() {
return sizeInBytes(deletionPolicy.getMinTranslogGenerationForRecovery());
return sizeInBytesByMinGen(-1);
}

/**
Expand All @@ -394,9 +408,19 @@ private int totalOperations(long minGeneration) {
}

/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
* Returns the number of operations in the transaction files that aren't committed to lucene..
*/
private int totalOperationsInGensAboveSeqNo(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
}
}

/**
* Returns the size in bytes of the translog files above the given generation
*/
private long sizeInBytes(long minGeneration) {
private long sizeInBytesByMinGen(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
Expand All @@ -406,6 +430,16 @@ private long sizeInBytes(long minGeneration) {
}
}

/**
* Returns the size in bytes of the translog files with ops above the given seqNo
*/
private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
}
}

/**
* Creates a new translog for the specified generation.
*
Expand Down Expand Up @@ -493,7 +527,7 @@ public Location add(final Operation operation) throws IOException {
* @return {@code true} if the translog should be flushed
*/
public boolean shouldFlush() {
final long size = this.sizeInBytes();
final long size = this.uncommittedSizeInBytes();
return size > this.indexSettings.getFlushThresholdSize().getBytes();
}

Expand Down Expand Up @@ -560,14 +594,34 @@ public Snapshot newSnapshot(long minGeneration) {
}
}

private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
return Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> {
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
return maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
});
}

private Snapshot createSnapshotFromMinSeqNo(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Snapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new);
return new MultiSnapshot(snapshots);
}
}

/**
* Returns a view into the current translog that is guaranteed to retain all current operations
* while receiving future ones as well
*/
public Translog.View newView() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final long viewGen = deletionPolicy.acquireTranslogGenForView();
final long viewGen = getMinFileGeneration();
deletionPolicy.acquireTranslogGenForView(viewGen);
try {
return new View(viewGen);
} catch (Exception e) {
Expand Down Expand Up @@ -674,7 +728,7 @@ private void closeOnTragicEvent(Exception ex) {
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
return new TranslogStats(totalOperations(), sizeInBytes());
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes());
}
}

Expand All @@ -698,35 +752,36 @@ public TranslogDeletionPolicy getDeletionPolicy() {
public class View implements Closeable {

AtomicBoolean closed = new AtomicBoolean();
final long minGeneration;

View(long minGeneration) {
this.minGeneration = minGeneration;
}
final long viewGenToRelease;

/** this smallest translog generation in this view */
public long minTranslogGeneration() {
return minGeneration;
View(long viewGenToRelease) {
this.viewGenToRelease = viewGenToRelease;
}

/**
* The total number of operations in the view.
* The total number of operations in the view files which contain an operation with a sequence number
* above the given min sequence numbers. This will be the number of operations in snapshot taken
* by calling {@link #snapshot(long)} with the same parameter.
*/
public int totalOperations() {
return Translog.this.totalOperations(minGeneration);
public int estimateTotalOperations(long minSequenceNumber) {
return Translog.this.totalOperationsInGensAboveSeqNo(minSequenceNumber);
}

/**
* Returns the size in bytes of the files behind the view.
* The total size of the view files which contain an operation with a sequence number
* above the given min sequence numbers. These are the files that would need to be read by snapshot
* acquired {@link #snapshot(long)} with the same parameter.
*/
public long sizeInBytes() {
return Translog.this.sizeInBytes(minGeneration);
public long estimateSizeInBytes(long minSequenceNumber) {
return Translog.this.sizeOfGensAboveSeqNoInBytes(minSequenceNumber);
}

/** create a snapshot from this view */
public Snapshot snapshot() {
/**
* create a snapshot from this view, containing all
* operations from the given sequence number and up (with potentially some more) */
public Snapshot snapshot(long minSequenceNumber) {
ensureOpen();
return Translog.this.newSnapshot(minGeneration);
return Translog.this.createSnapshotFromMinSeqNo(minSequenceNumber);
}

void ensureOpen() {
Expand All @@ -738,8 +793,8 @@ void ensureOpen() {
@Override
public void close() throws IOException {
if (closed.getAndSet(true) == false) {
logger.trace("closing view starting at translog [{}]", minGeneration);
deletionPolicy.releaseTranslogGenView(minGeneration);
logger.trace("closing view starting at translog [{}]", viewGenToRelease);
deletionPolicy.releaseTranslogGenView(viewGenToRelease);
trimUnreferencedReaders();
closeFilesIfNoPendingViews();
}
Expand Down Expand Up @@ -1663,4 +1718,12 @@ public String getTranslogUUID() {
return translogUUID;
}


TranslogWriter getCurrent() {
return current;
}

List<TranslogReader> getReaders() {
return readers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ public synchronized void setRetentionAgeInMillis(long ageInMillis) {
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
*/
synchronized long acquireTranslogGenForView() {
translogRefCounts.computeIfAbsent(minTranslogGenerationForRecovery, l -> Counter.newCounter(false)).addAndGet(1);
return minTranslogGenerationForRecovery;
synchronized void acquireTranslogGenForView(final long genForView) {
translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1);
}

/** returns the number of generations that were acquired for views */
Expand All @@ -80,7 +79,7 @@ synchronized int pendingViewsCount() {
}

/**
* releases a generation that was acquired by {@link #acquireTranslogGenForView()}
* releases a generation that was acquired by {@link #acquireTranslogGenForView(long)}
*/
synchronized void releaseTranslogGenView(long translogGen) {
Counter current = translogRefCounts.get(translogGen);
Expand Down Expand Up @@ -154,4 +153,9 @@ private long getMinTranslogGenRequiredByViews() {
public synchronized long getMinTranslogGenerationForRecovery() {
return minTranslogGenerationForRecovery;
}

synchronized long getViewCount(long viewGen) {
final Counter counter = translogRefCounts.get(viewGen);
return counter == null ? 0 : counter.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public String toString() {
return "TranslogSnapshot{" +
"readOperations=" + readOperations +
", position=" + position +
", totalOperations=" + totalOperations +
", estimateTotalOperations=" + totalOperations +
", length=" + length +
", reusableBuffer=" + reusableBuffer +
'}';
Expand Down
Loading