From 0dc8ac95d4eddb7d21b7cb18e1f9900d591a3b3f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 31 Oct 2017 10:20:36 -0400 Subject: [PATCH 1/2] Remove checkpoint tracker bit sets setting We added an index-level setting for controlling the size of the bit sets used to back the local checkpoint tracker. This setting is really only needed to control the memory footprint of the bit sets but we do not think this setting is going to be needed. This commit removes this setting before it is released to the wild after which we would have to worry about BWC implications. --- .../index/seqno/LocalCheckpointTracker.java | 28 +++++++------------ .../recovery/RecoverySourceHandler.java | 2 +- .../seqno/LocalCheckpointTrackerTests.java | 11 +------- 3 files changed, 12 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 6a7844057fd00..dcd7b3d180ed5 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -34,10 +34,9 @@ public class LocalCheckpointTracker { /** * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on - * demand and cleaning up while completed. This setting controls the size of the arrays. + * demand and cleaning up while completed. This constant controls the size of the arrays. */ - public static Setting SETTINGS_BIT_ARRAYS_SIZE = - Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope); + private static final int BIT_ARRAYS_SIZE = 1024; /** * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which @@ -45,11 +44,6 @@ public class LocalCheckpointTracker { */ final LinkedList processedSeqNo = new LinkedList<>(); - /** - * The size of each bit set representing processed sequence numbers. - */ - private final int bitArraysSize; - /** * The sequence number that the first bit in the first array corresponds to. */ @@ -70,11 +64,10 @@ public class LocalCheckpointTracker { * {@link SequenceNumbers#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint, * or {@link SequenceNumbers#NO_OPS_PERFORMED}. * - * @param indexSettings the index settings * @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbers#NO_OPS_PERFORMED} * @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbers#NO_OPS_PERFORMED} */ - public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) { + public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) { if (localCheckpoint < 0 && localCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { throw new IllegalArgumentException( "local checkpoint must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] " @@ -84,7 +77,6 @@ public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxS throw new IllegalArgumentException( "max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); } - bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; checkpoint = localCheckpoint; @@ -183,7 +175,7 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep @SuppressForbidden(reason = "Object#notifyAll") private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 : + assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 : "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; @@ -196,10 +188,10 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) checkpoint++; // the checkpoint always falls in the first bit set or just before. If it falls // on the last bit of the current bit set, we can clean it. - if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) { + if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) { processedSeqNo.removeFirst(); - firstProcessedSeqNo += bitArraysSize; - assert checkpoint - firstProcessedSeqNo < bitArraysSize; + firstProcessedSeqNo += BIT_ARRAYS_SIZE; + assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE; current = processedSeqNo.peekFirst(); } } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); @@ -218,13 +210,13 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1) private FixedBitSet getBitSetForSeqNo(final long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; - final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize; + final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE; if (bitSetOffset > Integer.MAX_VALUE) { throw new IndexOutOfBoundsException( "sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); } while (bitSetOffset >= processedSeqNo.size()) { - processedSeqNo.add(new FixedBitSet(bitArraysSize)); + processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE)); } return processedSeqNo.get((int) bitSetOffset); } @@ -239,7 +231,7 @@ private FixedBitSet getBitSetForSeqNo(final long seqNo) { private int seqNoToBitSetOffset(final long seqNo) { assert Thread.holdsLock(this); assert seqNo >= firstProcessedSeqNo; - return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; + return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE; } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c717e29353b65..5f692d8e8f5fa 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -243,7 +243,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index ab513c787c3cb..7f929dcc2f6f7 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -49,16 +49,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase { private static final int SMALL_CHUNK_SIZE = 4; public static LocalCheckpointTracker createEmptyTracker() { - return new LocalCheckpointTracker( - IndexSettingsModule.newIndexSettings( - "test", - Settings - .builder() - .put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE) - .build()), - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED - ); + return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); } @Override From e4839a7aa3e60cb358994689aa8fe95a8b63948b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 1 Nov 2017 11:52:27 -0400 Subject: [PATCH 2/2] More ops --- .../common/settings/IndexScopedSettings.java | 1 - .../index/seqno/LocalCheckpointTracker.java | 2 +- .../index/seqno/SequenceNumbersService.java | 2 +- .../index/seqno/LocalCheckpointTrackerTests.java | 13 +++++-------- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9d4d30b066f1f..ed686bf9236e5 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -120,7 +120,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.ALLOW_UNMAPPED, IndexSettings.INDEX_CHECK_ON_STARTUP, - LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE, IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD, IndexSettings.MAX_SLICES_PER_SCROLL, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index dcd7b3d180ed5..5380a3b2b7f90 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -36,7 +36,7 @@ public class LocalCheckpointTracker { * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on * demand and cleaning up while completed. This constant controls the size of the arrays. */ - private static final int BIT_ARRAYS_SIZE = 1024; + static final int BIT_ARRAYS_SIZE = 1024; /** * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 1c0b320558400..1b46eedacc457 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -56,7 +56,7 @@ public SequenceNumbersService( final long localCheckpoint, final long globalCheckpoint) { super(shardId, indexSettings); - localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint); + localCheckpointTracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint); globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint); } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 7f929dcc2f6f7..ae167bb59f04b 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -21,10 +21,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; import java.util.ArrayList; @@ -38,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -46,8 +45,6 @@ public class LocalCheckpointTrackerTests extends ESTestCase { private LocalCheckpointTracker tracker; - private static final int SMALL_CHUNK_SIZE = 4; - public static LocalCheckpointTracker createEmptyTracker() { return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); } @@ -89,7 +86,7 @@ public void testSimpleReplica() { public void testSimpleOverFlow() { List seqNoList = new ArrayList<>(); final boolean aligned = randomBoolean(); - final int maxOps = SMALL_CHUNK_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, SMALL_CHUNK_SIZE - 1)); + final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1)); for (int i = 0; i < maxOps; i++) { seqNoList.add(i); @@ -100,7 +97,7 @@ public void testSimpleOverFlow() { } assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); } public void testConcurrentPrimary() throws InterruptedException { @@ -141,7 +138,7 @@ protected void doRun() throws Exception { tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); } public void testConcurrentReplica() throws InterruptedException { @@ -189,7 +186,7 @@ protected void doRun() throws Exception { assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); + assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); } public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {