From 8d95f3abc3ad6c1c982d84d82842dda153288562 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 13 Feb 2019 19:49:09 -0500 Subject: [PATCH 1/3] timing --- .../org/elasticsearch/index/engine/InternalEngine.java | 9 +++++++++ .../java/org/elasticsearch/index/shard/IndexShard.java | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 66e0d30f164f1..e4d6f21275d98 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -893,6 +893,15 @@ public IndexResult index(Index index) throws IOException { new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (indexResult.getSeqNo() == 1L) { + // delay the advancement of the local checkpoint so "shouldPeriodicallyFlush" can be true twice + // (1) when two uncommitted operations are in translog and (2) when the local checkpoint is advanced to 1L. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); } indexResult.setTook(System.nanoTime() - index.startTime()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 50400c6961741..e862609bddb5a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2795,6 +2795,11 @@ public Translog.Durability getTranslogDurability() { */ public void afterWriteOperation() { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { + try { + Thread.sleep(100); // Waits for the first flush to finish + } catch (InterruptedException e) { + throw new AssertionError(e); + } if (flushOrRollRunning.compareAndSet(false, true)) { /* * We have to check again since otherwise there is a race when a thread passes the first check next to another thread which From 89352f2cb8611296b80f588d949ebbc4d477cdd3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Feb 2019 13:34:59 -0500 Subject: [PATCH 2/3] Relax testStressMaybeFlushOrRollTranslogGeneration --- .../elasticsearch/index/shard/IndexShardIT.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 674c252d780f3..fb8574594a874 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -120,6 +120,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -474,18 +475,22 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { final FlushStats initialStats = shard.flushStats(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); check = () -> { + assertFalse(shard.shouldPeriodicallyFlush()); final FlushStats currentStats = shard.flushStats(); String msg = String.format(Locale.ROOT, "flush stats: total=[%d vs %d], periodic=[%d vs %d]", initialStats.getTotal(), currentStats.getTotal(), initialStats.getPeriodic(), currentStats.getPeriodic()); - assertThat(msg, currentStats.getPeriodic(), equalTo(initialStats.getPeriodic() + 1)); - assertThat(msg, currentStats.getTotal(), equalTo(initialStats.getTotal() + 1)); + assertThat(msg, currentStats.getPeriodic(), + either(equalTo(initialStats.getPeriodic() + 1)).or(equalTo(initialStats.getPeriodic() + 2))); + assertThat(msg, currentStats.getTotal(), + either(equalTo(initialStats.getTotal() + 1)).or(equalTo(initialStats.getTotal() + 2))); }; } else { final long generation = getTranslog(shard).currentFileGeneration(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - check = () -> assertEquals( - generation + 1, - getTranslog(shard).currentFileGeneration()); + check = () -> { + assertFalse(shard.shouldRollTranslogGeneration()); + assertEquals(generation + 1, getTranslog(shard).currentFileGeneration()); + }; } assertBusy(check); running.set(false); From c5ad8a88cb05e1afac48846c8bf55cf3c691463b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Feb 2019 13:37:27 -0500 Subject: [PATCH 3/3] Revert "timing" This reverts commit 8d95f3abc3ad6c1c982d84d82842dda153288562. --- .../org/elasticsearch/index/engine/InternalEngine.java | 9 --------- .../java/org/elasticsearch/index/shard/IndexShard.java | 5 ----- 2 files changed, 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e4d6f21275d98..66e0d30f164f1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -893,15 +893,6 @@ public IndexResult index(Index index) throws IOException { new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - if (indexResult.getSeqNo() == 1L) { - // delay the advancement of the local checkpoint so "shouldPeriodicallyFlush" can be true twice - // (1) when two uncommitted operations are in translog and (2) when the local checkpoint is advanced to 1L. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); } indexResult.setTook(System.nanoTime() - index.startTime()); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e862609bddb5a..50400c6961741 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2795,11 +2795,6 @@ public Translog.Durability getTranslogDurability() { */ public void afterWriteOperation() { if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) { - try { - Thread.sleep(100); // Waits for the first flush to finish - } catch (InterruptedException e) { - throw new AssertionError(e); - } if (flushOrRollRunning.compareAndSet(false, true)) { /* * We have to check again since otherwise there is a race when a thread passes the first check next to another thread which