Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9e75221
Optimize indexing for the autogenerated ID append-only case
s1monw Aug 29, 2016
7561a1c
add tests for concurrent retires on the engine level and ensure versi…
s1monw Aug 29, 2016
43841fb
add replicating and recovering while appending unit tests
s1monw Aug 29, 2016
87141e0
Merge branch 'master' into issues/19813
s1monw Aug 29, 2016
ad74feb
add setting to opt out
s1monw Aug 29, 2016
aaa35a4
fix compilation
s1monw Aug 29, 2016
9e9c2c0
* Fix review comments
s1monw Aug 30, 2016
5b0c65c
address review comments
s1monw Aug 30, 2016
e89a814
add max_unsafe_auto_id_timestamp to segment stats
s1monw Aug 30, 2016
46f4cad
suppress master and client nodes in ExceptionRetryIT
s1monw Aug 30, 2016
f43d783
start client node in setup as well
s1monw Aug 30, 2016
16b980f
pimp IndexPrimaryRelocationIT to also use autoGeneratedID documents
s1monw Aug 30, 2016
06851f8
apply review comments
s1monw Aug 31, 2016
bdd5a69
Merge branch 'master' into issues/19813
s1monw Aug 31, 2016
23ca5b9
bring back engineIndex.id() reference
s1monw Aug 31, 2016
2fa9465
don't optimize the case when OpType.CREATE is set - dont' even allow …
s1monw Aug 31, 2016
39e513e
fix javadocs, use appropriarte assertions and fix create without ID test
s1monw Aug 31, 2016
305198d
fix javadocs to be more clear
s1monw Aug 31, 2016
836351d
use math.max rather than math.min for parnoia when getting current ti…
s1monw Aug 31, 2016
8d52083
add assertions that we use addDocument if we can in the auto gen ID case
s1monw Aug 31, 2016
a37c6c2
Merge branch 'master' into issues/19813
s1monw Sep 1, 2016
b96ee54
Merge branch 'master' into issues/19813
s1monw Sep 1, 2016
34f97de
notify in the migration guide that op_type=create must have an ID
s1monw Sep 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]MapperService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]Mapping.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]MetadataFieldMapper.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]ParsedDocument.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]CompletionFieldMapper.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]LegacyDateFieldMapper.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]LegacyDoubleFieldMapper.java" checks="LineLength" />
Expand Down Expand Up @@ -865,7 +864,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]CorruptedFileIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]CorruptedTranslogIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]DirectoryUtilsTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]ExceptionRetryIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]IndexStoreTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]StoreTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]suggest[/\\]stats[/\\]SuggestStatsIT.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -101,4 +102,15 @@ public String toString() {
}
return b.toString();
}

@Override
public void onRetry() {
for (BulkItemRequest item : items) {
if (item.request() instanceof ReplicationRequest) {
// all replication requests need to be notified here as well to ie. make sure that internal optimizations are
// disabled see IndexRequest#canHaveDuplicates()
((ReplicationRequest) item.request()).onRetry();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
/**
* Operation type controls if the type of the index operation.
*/
public static enum OpType {
public enum OpType {
/**
* Index the source. If there an existing document with the id, it will
* be replaced.
Expand Down Expand Up @@ -152,6 +152,17 @@ public static OpType fromString(String sOpType) {

private String pipeline;

/**
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
* provided ID.
*/
public static final int UNSET_AUTO_GENERATED_TIMESTAMP = -1;

private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;

private boolean isRetry = false;


public IndexRequest() {
}

Expand Down Expand Up @@ -202,6 +213,10 @@ public ActionRequestValidationException validate() {
}
}

if (opType() != OpType.INDEX && id == null) {
addValidationError("an id is required for a " + opType() + " operation", validationException);
}

if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
}
Expand All @@ -216,6 +231,11 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
id.getBytes(StandardCharsets.UTF_8).length, validationException);
}

if (id == null && (versionType == VersionType.INTERNAL && version == Versions.MATCH_ANY) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put an extra validation that's explicit for the opType check - i.e., if (opType() != OpType.INDEX && id == 1) addValidationError("an id is required for a " + opType() + " operation") (preferably next the current if (opType() == OpType.CREATE) { line). I think people will be confused by this message if they use create and we should also catch the delete case, while at it.

validationException = addValidationError("an id must be provided if version type or value are set", validationException);
}

return validationException;
}

Expand Down Expand Up @@ -589,10 +609,10 @@ public void process(@Nullable MappingMetaData mappingMd, boolean allowIdGenerati
}

// generate id if not already provided and id generation is allowed
if (allowIdGeneration) {
if (id == null) {
id(UUIDs.base64UUID());
}
if (allowIdGeneration && id == null) {
assert autoGeneratedTimestamp == -1;
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
id(UUIDs.base64UUID());
}

// generate timestamp if not provided, we always have one post this stage...
Expand Down Expand Up @@ -639,6 +659,8 @@ public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
}

@Override
Expand All @@ -655,6 +677,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
out.writeBoolean(isRetry);
out.writeLong(autoGeneratedTimestamp);
}

@Override
Expand All @@ -667,4 +691,25 @@ public String toString() {
}
return "index {[" + index + "][" + type + "][" + id + "], source[" + sSource + "]}";
}


/**
* Returns <code>true</code> if this request has been sent to a shard copy more than once.
*/
public boolean isRetry() {
return isRetry;
}

@Override
public void onRetry() {
isRetry = true;
}

/**
* Returns the timestamp the auto generated ID was created or {@value #UNSET_AUTO_GENERATED_TIMESTAMP} if the
* document has no auto generated timestamp. This method will return a positive value iff the id was auto generated.
*/
public long getAutoGeneratedTimestamp() {
return autoGeneratedTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,6 @@ public IndexRequestBuilder setOpType(IndexRequest.OpType opType) {
return this;
}

/**
* Sets a string representation of the {@link #setOpType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can
* be either "index" or "create".
*/
public IndexRequestBuilder setOpType(String opType) {
request.opType(IndexRequest.OpType.fromString(opType));
return this;
}

/**
* Set to <tt>true</tt> to force this index to use {@link org.elasticsearch.action.index.IndexRequest.OpType#CREATE}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, In
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType());
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
Expand All @@ -171,7 +171,7 @@ public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, In
public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType());
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}

public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,12 @@ public String toString() {
public String getDescription() {
return toString();
}

/**
* This method is called before this replication request is retried
* the first time.
*/
public void onRetry() {
// nothing by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ void retry(Exception failure) {
return;
}
setPhase(task, "waiting_for_retry");
request.onRetry();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add checks in the unit tests of this class (TransportReplicationActionTests) to make sure this is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did my very best, can you check again

final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
33 changes: 26 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

/**
*
*/
public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id";
Expand Down Expand Up @@ -856,19 +853,24 @@ public long endTime() {
public static class Index extends Operation {

private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;
private boolean created;

public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime,
long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, version, versionType, origin, startTime);
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
}

public Index(Term uid, ParsedDocument doc) {
this(uid, doc, Versions.MATCH_ANY);
}
} // TEST ONLY

public Index(Term uid, ParsedDocument doc, long version) {
this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
Index(Term uid, ParsedDocument doc, long version) {
this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), -1, false);
}

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -928,6 +930,23 @@ protected int estimatedSizeInBytes() {
return (id().length() + type().length()) * 2 + source().length() + 12;
}

/**
* Returns a positive timestamp if the ID of this document is auto-generated by elasticsearch.
* if this property is non-negative indexing code might optimize the addition of this document
* due to it's append only nature.
*/
public long getAutoGeneratedIdTimestamp() {
return autoGeneratedIdTimestamp;
}

/**
* Returns <code>true</code> if this index requests has been retried on the coordinating node and can therefor be delivered
* multiple times. Note: this might also be set to true if an equivalent event occurred like the replay of the transaction log
*/
public boolean isRetry() {
return isRetry;
}

}

public static class Delete extends Operation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,17 @@ public final class EngineConfig {
}
}, Property.IndexScope, Property.NodeScope);

private TranslogConfig translogConfig;
/**
* Configures an index to optimize documents with auto generated ids for append only. If this setting is updated from <code>false</code>
* to <code>true</code> might not take effect immediately. In other words, disabling the optimiation will be immediately applied while
* re-enabling it might not be applied until the engine is in a safe state to do so. Depending on the engine implementation a change to
* this setting won't be reflected re-enabled optimization until the engine is restarted or the index is closed and reopened.
* The default is <code>true</code>
*/
public static final Setting<Boolean> INDEX_OPTIMIZE_AUTO_GENERATED_IDS = Setting.boolSetting("index.optimize_auto_generated_id", true,
Property.IndexScope, Property.Dynamic);

private final TranslogConfig translogConfig;
private final OpenMode openMode;

/**
Expand Down Expand Up @@ -311,4 +321,12 @@ public enum OpenMode {
public RefreshListeners getRefreshListeners() {
return refreshListeners;
}

/**
* Returns <code>true</code> iff auto generated IDs should be optimized inside the engine for append only.
* The default is <code>true</code>.
*/
public boolean getOptimizeAutoGeneratedIds() {
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
}
}
Loading