Skip to content

Commit 772a513

Browse files
committed
Feedback
1 parent d8a846e commit 772a513

File tree

6 files changed

+124
-29
lines changed

6 files changed

+124
-29
lines changed

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,52 @@ public static SeqNoStats loadSeqNoStatsFromLuceneCommit(
5757
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
5858
}
5959

60+
/**
61+
* Compute the minimum of the given current minimum sequence number and the specified sequence number, accounting for the fact that the
62+
* current minimum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or
63+
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current minimum sequence number is not
64+
* {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number
65+
* must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
66+
*
67+
* @param minSeqNo the current minimum sequence number
68+
* @param seqNo the specified sequence number
69+
* @return the new minimum sequence number
70+
*/
71+
public static long min(final long minSeqNo, final long seqNo) {
72+
if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
73+
return seqNo;
74+
} else if (minSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
75+
return seqNo;
76+
} else {
77+
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
78+
throw new IllegalArgumentException("sequence number must be assigned");
79+
}
80+
return Math.min(minSeqNo, seqNo);
81+
}
82+
}
83+
84+
/**
85+
* Compute the maximum of the given current maximum sequence number and the specified sequence number, accounting for the fact that the
86+
* current maximum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or
87+
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current maximum sequence number is not
88+
* {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number
89+
* must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
90+
*
91+
* @param maxSeqNo the current maximum sequence number
92+
* @param seqNo the specified sequence number
93+
* @return the new maximum sequence number
94+
*/
95+
public static long max(final long maxSeqNo, final long seqNo) {
96+
if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
97+
return seqNo;
98+
} else if (maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
99+
return seqNo;
100+
} else {
101+
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
102+
throw new IllegalArgumentException("sequence number must be assigned");
103+
}
104+
return Math.max(maxSeqNo, seqNo);
105+
}
106+
}
107+
60108
}

core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,12 @@ static Checkpoint emptyTranslogCheckpoint(final long offset, final long generati
102102
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint);
103103
}
104104

105-
static Checkpoint readChecksummedV2(final DataInput in) throws IOException {
105+
static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
106106
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
107107
}
108108

109109
// reads a checksummed checkpoint introduced in ES 5.0.0
110-
static Checkpoint readChecksummedV1(final DataInput in) throws IOException {
110+
static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
111111
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
112112
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
113113
final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
@@ -128,17 +128,17 @@ public String toString() {
128128

129129
public static Checkpoint read(Path path) throws IOException {
130130
try (Directory dir = new SimpleFSDirectory(path.getParent())) {
131-
try (final IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) {
131+
try (IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) {
132132
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
133133
CodecUtil.checksumEntireFile(indexInput);
134134
final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION);
135135
if (fileVersion == INITIAL_VERSION) {
136-
assert indexInput.length() == V1_FILE_SIZE;
137-
return Checkpoint.readChecksummedV1(indexInput);
136+
assert indexInput.length() == V1_FILE_SIZE : indexInput.length();
137+
return Checkpoint.readCheckpointV5_0_0(indexInput);
138138
} else {
139-
assert fileVersion == CURRENT_VERSION;
140-
assert indexInput.length() == FILE_SIZE;
141-
return Checkpoint.readChecksummedV2(indexInput);
139+
assert fileVersion == CURRENT_VERSION : fileVersion;
140+
assert indexInput.length() == FILE_SIZE : indexInput.length();
141+
return Checkpoint.readCheckpointV6_0_0(indexInput);
142142
}
143143
}
144144
}

core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
7676
* @throws IOException if any of the file operations resulted in an I/O exception
7777
*/
7878
public static TranslogReader open(
79-
final FileChannel channel,
80-
final Path path,
81-
final Checkpoint checkpoint,
82-
final String translogUUID) throws IOException {
79+
final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException {
8380

8481
try {
8582
InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel)); // don't close
@@ -159,7 +156,7 @@ public int totalOperations() {
159156
}
160157

161158
@Override
162-
Checkpoint getCheckpoint() {
159+
final Checkpoint getCheckpoint() {
163160
return checkpoint;
164161
}
165162

core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ private TranslogWriter(
8585
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
8686
this.lastSyncedCheckpoint = initialCheckpoint;
8787
this.totalOffset = initialCheckpoint.offset;
88-
assert initialCheckpoint.minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED;
88+
assert initialCheckpoint.minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.minSeqNo;
8989
this.minSeqNo = initialCheckpoint.minSeqNo;
90-
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED;
90+
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
9191
this.maxSeqNo = initialCheckpoint.maxSeqNo;
9292
this.globalCheckpointSupplier = globalCheckpointSupplier;
9393
}
@@ -185,24 +185,14 @@ public synchronized Translog.Location add(final BytesReference data, final long
185185

186186
if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
187187
assert operationCounter == 0;
188-
minSeqNo = seqNo;
189-
} else if (minSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
190-
minSeqNo = seqNo;
191-
} else {
192-
assert seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO;
193-
minSeqNo = Math.min(minSeqNo, seqNo);
194188
}
195-
196189
if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
197190
assert operationCounter == 0;
198-
maxSeqNo = seqNo;
199-
} else if (maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
200-
maxSeqNo = seqNo;
201-
} else {
202-
assert seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO;
203-
maxSeqNo = Math.max(maxSeqNo, seqNo);
204191
}
205192

193+
minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
194+
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
195+
206196
operationCounter++;
207197

208198
return new Translog.Location(generation, offset, data.length());
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.seqno;
21+
22+
import org.elasticsearch.test.ESTestCase;
23+
24+
import static org.hamcrest.Matchers.containsString;
25+
import static org.hamcrest.Matchers.equalTo;
26+
import static org.hamcrest.Matchers.hasToString;
27+
28+
public class SequenceNumbersTests extends ESTestCase {
29+
30+
public void testMin() {
31+
final long seqNo = randomNonNegativeLong();
32+
assertThat(SequenceNumbers.min(SequenceNumbersService.NO_OPS_PERFORMED, seqNo), equalTo(seqNo));
33+
assertThat(
34+
SequenceNumbers.min(SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.UNASSIGNED_SEQ_NO),
35+
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
36+
assertThat(SequenceNumbers.min(SequenceNumbersService.UNASSIGNED_SEQ_NO, seqNo), equalTo(seqNo));
37+
final long minSeqNo = randomNonNegativeLong();
38+
assertThat(SequenceNumbers.min(minSeqNo, seqNo), equalTo(Math.min(minSeqNo, seqNo)));
39+
40+
final IllegalArgumentException e =
41+
expectThrows(IllegalArgumentException.class, () -> SequenceNumbers.min(minSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO));
42+
assertThat(e, hasToString(containsString("sequence number must be assigned")));
43+
}
44+
45+
public void testMax() {
46+
final long seqNo = randomNonNegativeLong();
47+
assertThat(SequenceNumbers.max(SequenceNumbersService.NO_OPS_PERFORMED, seqNo), equalTo(seqNo));
48+
assertThat(
49+
SequenceNumbers.max(SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.UNASSIGNED_SEQ_NO),
50+
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
51+
assertThat(SequenceNumbers.max(SequenceNumbersService.UNASSIGNED_SEQ_NO, seqNo), equalTo(seqNo));
52+
final long maxSeqNo = randomNonNegativeLong();
53+
assertThat(SequenceNumbers.min(maxSeqNo, seqNo), equalTo(Math.min(maxSeqNo, seqNo)));
54+
55+
final IllegalArgumentException e =
56+
expectThrows(IllegalArgumentException.class, () -> SequenceNumbers.min(maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO));
57+
assertThat(e, hasToString(containsString("sequence number must be assigned")));
58+
}
59+
60+
}

core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1218,7 +1218,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException {
12181218
try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
12191219
fail("corrupted");
12201220
} catch (IllegalStateException ex) {
1221-
assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, generation=2, minSeqNo=0, maxSeqNo=0, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-2, maxSeqNo=-2, globalCheckpoint=-2}", ex.getMessage());
1221+
assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, generation=2, minSeqNo=0, maxSeqNo=0, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage());
12221222
}
12231223
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
12241224
try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {

0 commit comments

Comments
 (0)