From 21299b8508f4cfd3099c42dce589af48d21ed6c6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 30 Oct 2017 16:45:51 -0400 Subject: [PATCH 1/7] Lazy initialize checkpoint tracker bit sets This local checkpoint tracker uses collections of bit sets to track which sequence numbers are complete, eventually removing these bit sets when the local checkpoint advances. However, these bit sets were eagerly allocated so that if a sequence number far ahead of the checkpoint was marked as completed, all bit sets between the "last" bit set and the bit set needed to track the marked sequence number were allocated. If this sequence number was too far ahead, the memory requirements could be excessive. This commit opts for a different strategy for holding on to these bit sets and enables them to be lazily allocated. --- .../index/seqno/LocalCheckpointTracker.java | 81 +++++++++---------- .../seqno/LocalCheckpointTrackerTests.java | 38 +++++++-- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 6a7844057fd00..6caa1effdd616 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -19,13 +19,12 @@ package org.elasticsearch.index.seqno; +import com.carrotsearch.hppc.LongObjectHashMap; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.index.IndexSettings; -import java.util.LinkedList; - /** * This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all * previous sequence numbers have been processed (inclusive). @@ -40,21 +39,16 @@ public class LocalCheckpointTracker { Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope); /** - * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which - * marks the sequence number the fist bit in the first array corresponds to. + * A collection of bit arrays representing pending sequence numbers. Each sequence number is mapped to a bit array by dividing by the + * bit set size. */ - final LinkedList processedSeqNo = new LinkedList<>(); + final LongObjectHashMap processedSeqNo = new LongObjectHashMap<>(); /** * The size of each bit set representing processed sequence numbers. */ private final int bitArraysSize; - /** - * The sequence number that the first bit in the first array corresponds to. - */ - long firstProcessedSeqNo; - /** * The current local checkpoint, i.e., all sequence numbers no more than this number have been completed. */ @@ -85,7 +79,6 @@ public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxS "max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); } bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); - firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; checkpoint = localCheckpoint; } @@ -113,9 +106,9 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) { // this is possible during recovery where we might replay an operation that was also replicated return; } - final FixedBitSet bitSet = getBitSetForSeqNo(seqNo); - final int offset = seqNoToBitSetOffset(seqNo); - bitSet.set(offset); + final FixedBitSet bitArray = getBitArrayForSeqNo(seqNo); + final int offset = seqNoToBitArrayOffset(seqNo); + bitArray.set(offset); if (seqNo == checkpoint + 1) { updateCheckpoint(); } @@ -130,7 +123,6 @@ synchronized void resetCheckpoint(final long checkpoint) { assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); - firstProcessedSeqNo = checkpoint + 1; this.checkpoint = checkpoint; } @@ -183,26 +175,30 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep @SuppressForbidden(reason = "Object#notifyAll") private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 : - "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; - assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : - "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; - assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : + assert getBitArrayForSeqNo(checkpoint + 1).get(seqNoToBitArrayOffset(checkpoint + 1)) : "updateCheckpoint is called but the bit following the checkpoint is not set"; try { // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words - FixedBitSet current = processedSeqNo.getFirst(); + long bitArrayKey = getBitArrayKey(checkpoint); + FixedBitSet current = processedSeqNo.get(bitArrayKey); + if (current == null) { + // the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set + assert checkpoint % bitArraysSize == bitArraysSize - 1; + current = processedSeqNo.get(++bitArrayKey); + } do { checkpoint++; - // the checkpoint always falls in the first bit set or just before. If it falls - // on the last bit of the current bit set, we can clean it. - if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) { - processedSeqNo.removeFirst(); - firstProcessedSeqNo += bitArraysSize; - assert checkpoint - firstProcessedSeqNo < bitArraysSize; - current = processedSeqNo.peekFirst(); + /* + * The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the + * current bit set, we can clean it. + */ + if (checkpoint == (1 + bitArrayKey) * bitArraysSize - 1) { + assert current != null; + final FixedBitSet removed = processedSeqNo.remove(bitArrayKey); + assert removed == current; + current = processedSeqNo.get(++bitArrayKey); } - } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); + } while (current != null && current.get(seqNoToBitArrayOffset(checkpoint + 1))); } finally { // notifies waiters in waitForOpsToComplete this.notifyAll(); @@ -215,31 +211,30 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) * @param seqNo the sequence number to obtain the bit array for * @return the bit array corresponding to the provided sequence number */ - private FixedBitSet getBitSetForSeqNo(final long seqNo) { + private long getBitArrayKey(final long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; - final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize; - if (bitSetOffset > Integer.MAX_VALUE) { - throw new IndexOutOfBoundsException( - "sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); - } - while (bitSetOffset >= processedSeqNo.size()) { - processedSeqNo.add(new FixedBitSet(bitArraysSize)); + return seqNo / bitArraysSize; + } + + private FixedBitSet getBitArrayForSeqNo(final long seqNo) { + assert Thread.holdsLock(this); + final long bitArrayKey = getBitArrayKey(seqNo); + if (processedSeqNo.containsKey(bitArrayKey) == false) { + processedSeqNo.put(bitArrayKey, new FixedBitSet(bitArraysSize)); } - return processedSeqNo.get((int) bitSetOffset); + return processedSeqNo.get(bitArrayKey); } /** * Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence - * number can be obtained via {@link #getBitSetForSeqNo(long)}. + * number can be obtained via {@link #getBitArrayForSeqNo(long)}. * * @param seqNo the sequence number to obtain the position for * @return the position in the bit array corresponding to the provided sequence number */ - private int seqNoToBitSetOffset(final long seqNo) { + private int seqNoToBitArrayOffset(final long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstProcessedSeqNo; - return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; + return Math.toIntExact(seqNo % bitArraysSize); } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index ab513c787c3cb..9eab2b2fe97a9 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -19,12 +19,16 @@ package org.elasticsearch.index.seqno; +import com.carrotsearch.hppc.LongObjectHashMap; +import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.junit.Before; import java.util.ArrayList; @@ -38,7 +42,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -95,6 +98,14 @@ public void testSimpleReplica() { assertThat(tracker.getCheckpoint(), equalTo(2L)); } + public void testLazyInitialization() { + /* + * Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large + * sequence numbers this could lead to excessive memory usage resulting in out of memory errors. + */ + tracker.markSeqNoAsCompleted(randomNonNegativeLong()); + } + public void testSimpleOverFlow() { List seqNoList = new ArrayList<>(); final boolean aligned = randomBoolean(); @@ -109,7 +120,9 @@ public void testSimpleOverFlow() { } assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + if (aligned == false) { + assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / SMALL_CHUNK_SIZE)); + } } public void testConcurrentPrimary() throws InterruptedException { @@ -150,7 +163,9 @@ protected void doRun() throws Exception { tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + if (tracker.processedSeqNo.size() == 1) { + assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / SMALL_CHUNK_SIZE)); + } } public void testConcurrentReplica() throws InterruptedException { @@ -198,7 +213,10 @@ protected void doRun() throws Exception { assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); + if (tracker.processedSeqNo.size() == 1) { + assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / SMALL_CHUNK_SIZE)); + } } public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException { @@ -253,7 +271,17 @@ public void testResetCheckpoint() { tracker.resetCheckpoint(localCheckpoint); assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); - assertThat(tracker.processedSeqNo, empty()); + assertThat(tracker.processedSeqNo, new BaseMatcher>() { + @Override + public boolean matches(Object item) { + return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty()); + } + + @Override + public void describeTo(Description description) { + description.appendText("empty"); + } + }); assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); } } From 91fbab356a7767037f801837aebcfe5414ee1a25 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 30 Oct 2017 18:23:52 -0400 Subject: [PATCH 2/7] Method --- .../elasticsearch/index/seqno/LocalCheckpointTracker.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 6caa1effdd616..58aaee2af2ae4 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -192,7 +192,7 @@ assert getBitArrayForSeqNo(checkpoint + 1).get(seqNoToBitArrayOffset(checkpoint * The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the * current bit set, we can clean it. */ - if (checkpoint == (1 + bitArrayKey) * bitArraysSize - 1) { + if (checkpoint == lastSeqNoInBitArray(bitArrayKey)) { assert current != null; final FixedBitSet removed = processedSeqNo.remove(bitArrayKey); assert removed == current; @@ -205,6 +205,10 @@ assert getBitArrayForSeqNo(checkpoint + 1).get(seqNoToBitArrayOffset(checkpoint } } + private long lastSeqNoInBitArray(final long bitArrayKey) { + return (1 + bitArrayKey) * bitArraysSize - 1; + } + /** * Return the bit array for the provided sequence number, possibly allocating a new array if needed. * From b2cbfc55bb2e319c74215560aa5986d1d023612a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 30 Oct 2017 18:24:29 -0400 Subject: [PATCH 3/7] Add assertion --- .../elasticsearch/index/seqno/LocalCheckpointTrackerTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 9eab2b2fe97a9..906f9a8f6062e 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -104,6 +104,7 @@ public void testLazyInitialization() { * sequence numbers this could lead to excessive memory usage resulting in out of memory errors. */ tracker.markSeqNoAsCompleted(randomNonNegativeLong()); + assertThat(tracker.processedSeqNo.size(), equalTo(1)); } public void testSimpleOverFlow() { From cabda878aad226d0fae15331ff0ea5e5c792cf98 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 31 Oct 2017 09:20:31 -0400 Subject: [PATCH 4/7] Use index --- .../elasticsearch/index/seqno/LocalCheckpointTracker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 58aaee2af2ae4..81d11b411674b 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -223,8 +223,9 @@ private long getBitArrayKey(final long seqNo) { private FixedBitSet getBitArrayForSeqNo(final long seqNo) { assert Thread.holdsLock(this); final long bitArrayKey = getBitArrayKey(seqNo); - if (processedSeqNo.containsKey(bitArrayKey) == false) { - processedSeqNo.put(bitArrayKey, new FixedBitSet(bitArraysSize)); + final int index = processedSeqNo.indexOf(bitArrayKey); + if (processedSeqNo.indexExists(index) == false) { + processedSeqNo.indexInsert(index, bitArrayKey, new FixedBitSet(bitArraysSize)); } return processedSeqNo.get(bitArrayKey); } From 68c4eab0f50cf615a705ab8ccaa6efd31d121c1e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 31 Oct 2017 09:21:12 -0400 Subject: [PATCH 5/7] Use index more --- .../org/elasticsearch/index/seqno/LocalCheckpointTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 81d11b411674b..cf56c07ab24ba 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -227,7 +227,7 @@ private FixedBitSet getBitArrayForSeqNo(final long seqNo) { if (processedSeqNo.indexExists(index) == false) { processedSeqNo.indexInsert(index, bitArrayKey, new FixedBitSet(bitArraysSize)); } - return processedSeqNo.get(bitArrayKey); + return processedSeqNo.indexGet(index); } /** From 97bb3cadbf97da876ee305243da9bc26e81e9e98 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 31 Oct 2017 10:16:24 -0400 Subject: [PATCH 6/7] Revert --- .../org/elasticsearch/index/seqno/LocalCheckpointTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index cf56c07ab24ba..81d11b411674b 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -227,7 +227,7 @@ private FixedBitSet getBitArrayForSeqNo(final long seqNo) { if (processedSeqNo.indexExists(index) == false) { processedSeqNo.indexInsert(index, bitArrayKey, new FixedBitSet(bitArraysSize)); } - return processedSeqNo.indexGet(index); + return processedSeqNo.get(bitArrayKey); } /** From f4f0dae6043726600d5cd7ad65253879a7df67fc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 31 Oct 2017 11:20:25 -0400 Subject: [PATCH 7/7] Oh yeah --- .../index/seqno/LocalCheckpointTracker.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 81d11b411674b..c8c72316bf4e7 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -224,10 +224,14 @@ private FixedBitSet getBitArrayForSeqNo(final long seqNo) { assert Thread.holdsLock(this); final long bitArrayKey = getBitArrayKey(seqNo); final int index = processedSeqNo.indexOf(bitArrayKey); - if (processedSeqNo.indexExists(index) == false) { - processedSeqNo.indexInsert(index, bitArrayKey, new FixedBitSet(bitArraysSize)); + final FixedBitSet bitArray; + if (processedSeqNo.indexExists(index)) { + bitArray = processedSeqNo.indexGet(index); + } else { + bitArray = new FixedBitSet(bitArraysSize); + processedSeqNo.indexInsert(index, bitArrayKey, bitArray); } - return processedSeqNo.get(bitArrayKey); + return bitArray; } /**