Skip to content

Commit e3bc32a

Browse files
committed
feedback
1 parent 7b06600 commit e3bc32a

File tree

10 files changed

+82
-69
lines changed

10 files changed

+82
-69
lines changed

core/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323
import org.elasticsearch.common.xcontent.ToXContent;
2424
import org.elasticsearch.common.xcontent.XContentBuilder;
2525
import org.elasticsearch.common.xcontent.XContentBuilderString;
26+
import org.elasticsearch.index.seqno.SequenceNumbersService;
2627
import org.elasticsearch.index.shard.ShardId;
2728

2829
import java.io.IOException;
2930

3031
/**
31-
* A base class for the response of a write operation that involves are a single doc
32+
* A base class for the response of a write operation that involves a single doc
3233
*/
3334
public abstract class DocWriteResponse extends ReplicationResponse implements ToXContent {
3435

@@ -46,8 +47,8 @@ public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, lon
4647
this.version = version;
4748
}
4849

49-
public DocWriteResponse() {
50-
50+
// needed for deserialization
51+
protected DocWriteResponse() {
5152
}
5253

5354
/**
@@ -87,7 +88,7 @@ public long getVersion() {
8788
}
8889

8990
/**
90-
* Returns the sequence number assigned for this change. Returns -1L if the operation wasn't
91+
* Returns the sequence number assigned for this change. Returns {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if the operation wasn't
9192
* performed (i.e., an update operation that resulted in a NOOP).
9293
*/
9394
public long getSeqNo() {
@@ -101,8 +102,8 @@ public void readFrom(StreamInput in) throws IOException {
101102
shardId = ShardId.readShardId(in);
102103
type = in.readString();
103104
id = in.readString();
104-
version = in.readLong();
105-
seqNo = in.readLong();
105+
version = in.readZLong();
106+
seqNo = in.readZLong();
106107
}
107108

108109
@Override
@@ -111,8 +112,8 @@ public void writeTo(StreamOutput out) throws IOException {
111112
shardId.writeTo(out);
112113
out.writeString(type);
113114
out.writeString(id);
114-
out.writeLong(version);
115-
out.writeLong(seqNo);
115+
out.writeZLong(version);
116+
out.writeZLong(seqNo);
116117
}
117118

118119
static final class Fields {

core/src/main/java/org/elasticsearch/action/index/IndexResponse.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ public class IndexResponse extends DocWriteResponse {
3939
private boolean created;
4040

4141
public IndexResponse() {
42-
super();
43-
4442
}
4543

4644
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) {

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,7 @@ static class IndexShardReference implements Releasable {
10471047
private final AtomicBoolean closed = new AtomicBoolean(false);
10481048

10491049
IndexShardReference(IndexShard counter, long opPrimaryTerm) {
1050+
// this enforces primary terms, if we're lagging an exception will be thrown.
10501051
counter.incrementOperationCounter(opPrimaryTerm);
10511052
this.counter = counter;
10521053
}

core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.xcontent.XContentBuilder;
2626
import org.elasticsearch.common.xcontent.XContentBuilderString;
2727
import org.elasticsearch.index.get.GetResult;
28+
import org.elasticsearch.index.seqno.SequenceNumbersService;
2829
import org.elasticsearch.index.shard.ShardId;
2930

3031
import java.io.IOException;
@@ -44,7 +45,7 @@ public UpdateResponse() {
4445
* For example: update script with operation set to none
4546
*/
4647
public UpdateResponse(ShardId shardId, String type, String id, long version, boolean created) {
47-
this(new ShardInfo(0, 0), shardId, type, id, -1l, version, created);
48+
this(new ShardInfo(0, 0), shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, created);
4849
}
4950

5051
public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long seqNo, long version, boolean created) {

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.index.mapper.ParsedDocument;
4646
import org.elasticsearch.index.mapper.Uid;
4747
import org.elasticsearch.index.merge.MergeStats;
48+
import org.elasticsearch.index.seqno.SequenceNumbersService;
4849
import org.elasticsearch.index.shard.ShardId;
4950
import org.elasticsearch.index.store.Store;
5051
import org.elasticsearch.index.translog.Translog;
@@ -631,8 +632,8 @@ public static abstract class Operation {
631632

632633
public Operation(Term uid, long seqNo, long version, VersionType versionType, Origin origin, long startTime) {
633634
this.uid = uid;
634-
assert origin != Origin.PRIMARY || seqNo == -1l : "seqNo should not be set when origin is PRIMARY";
635-
assert origin == Origin.PRIMARY || seqNo > -1l : "seqNo should be set when origin is not PRIMARY";
635+
assert origin != Origin.PRIMARY || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "seqNo should not be set when origin is PRIMARY";
636+
assert origin == Origin.PRIMARY || seqNo >= 0 : "seqNo should be set when origin is not PRIMARY";
636637
this.seqNo = seqNo;
637638
this.version = version;
638639
this.versionType = versionType;
@@ -715,7 +716,7 @@ public Index(Term uid, ParsedDocument doc) {
715716
}
716717

717718
public Index(Term uid, ParsedDocument doc, long version) {
718-
this(uid, doc, -1, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
719+
this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
719720
}
720721

721722
public ParsedDocument parsedDoc() {
@@ -780,7 +781,7 @@ public Delete(String type, String id, Term uid, long seqNo, long version, Versio
780781
}
781782

782783
public Delete(String type, String id, Term uid) {
783-
this(type, id, uid, -1, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false);
784+
this(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false);
784785
}
785786

786787
public String type() {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ public boolean index(Index index) {
349349
maybeFailEngine("index", t);
350350
throw new IndexFailedEngineException(shardId, index.type(), index.id(), t);
351351
} finally {
352-
if (index.seqNo() != -1) {
352+
if (index.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
353353
seqNoService.markSeqNoAsCompleted(index.seqNo());
354354
}
355355
}
@@ -459,7 +459,7 @@ public void delete(Delete delete) throws EngineException {
459459
maybeFailEngine("delete", t);
460460
throw new DeleteFailedEngineException(shardId, delete, t);
461461
} finally {
462-
if (delete.seqNo() != -1l) {
462+
if (delete.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
463463
seqNoService.markSeqNoAsCompleted(delete.seqNo());
464464
}
465465
}

core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.index.fielddata.FieldDataType;
2828
import org.elasticsearch.index.mapper.*;
2929
import org.elasticsearch.index.mapper.ParseContext.Document;
30+
import org.elasticsearch.index.seqno.SequenceNumbersService;
3031

3132
import java.io.IOException;
3233
import java.util.List;
@@ -112,7 +113,7 @@ public void preParse(ParseContext context) throws IOException {
112113
@Override
113114
protected void parseCreateField(ParseContext context, List<Field> fields) throws IOException {
114115
// see InternalEngine.updateVersion to see where the real version value is set
115-
final Field seqNo = new NumericDocValuesField(NAME, -1L);
116+
final Field seqNo = new NumericDocValuesField(NAME, SequenceNumbersService.UNASSIGNED_SEQ_NO);
116117
context.seqNo(seqNo);
117118
fields.add(seqNo);
118119
}

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
/** a very light weight implementation. will be replaced with proper machinery later */
2828
public class SequenceNumbersService extends AbstractIndexShardComponent {
2929

30+
public final static long UNASSIGNED_SEQ_NO = -1L;
31+
3032
AtomicLong seqNoGenerator = new AtomicLong();
3133

3234
public SequenceNumbersService(ShardId shardId, IndexSettings indexSettings) {

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.elasticsearch.index.refresh.RefreshStats;
8080
import org.elasticsearch.index.search.stats.SearchStats;
8181
import org.elasticsearch.index.search.stats.ShardSearchStats;
82+
import org.elasticsearch.index.seqno.SequenceNumbersService;
8283
import org.elasticsearch.index.similarity.SimilarityService;
8384
import org.elasticsearch.index.snapshots.IndexShardRepository;
8485
import org.elasticsearch.index.store.Store;
@@ -450,7 +451,7 @@ public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, Ve
450451
if (shardRouting.primary() == false) {
451452
throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
452453
}
453-
return prepareIndex(docMapper(source.type()), source, -1, version, versionType, Engine.Operation.Origin.PRIMARY);
454+
return prepareIndex(docMapper(source.type()), source, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, versionType, Engine.Operation.Origin.PRIMARY);
454455
} catch (Throwable t) {
455456
verifyNotClosed(t);
456457
throw t;
@@ -508,7 +509,7 @@ public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version
508509
}
509510
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
510511
final Term uid = documentMapper.uidMapper().term(Uid.createUid(type, id));
511-
return prepareDelete(type, id, uid, -1L, version, versionType, Engine.Operation.Origin.PRIMARY);
512+
return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, versionType, Engine.Operation.Origin.PRIMARY);
512513
}
513514

514515
public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long version, VersionType versionType) {
@@ -1522,6 +1523,10 @@ protected void alreadyClosed() {
15221523
}
15231524
}
15241525

1526+
/**
1527+
* increments the ongoing operations counter. If the given primary term is lower then the one in {@link #shardRouting}
1528+
* an {@link IllegalIndexShardStateException} is thrown.
1529+
*/
15251530
public void incrementOperationCounter(long opPrimaryTerm) {
15261531
if (shardRouting.primaryTerm() > opPrimaryTerm) {
15271532
throw new IllegalIndexShardStateException(shardId, state, "operation term [{}] is too old (current [{}])", opPrimaryTerm, shardRouting.primaryTerm());

0 commit comments

Comments
 (0)