Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,16 @@ private static void updateTransformStateAndGetLastCheckpoint(

long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint();

// if: the state is stored on the latest index, it does not need an update
if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
listener.onResponse(lastCheckpoint);
return;
}

// else: the state is on an old index, update by persisting it to the latest index
transformConfigManager.putOrUpdateTransformStoredDoc(
currentState.v1(),
currentState.v2(),
null, // set seqNoPrimaryTermAndIndex to `null` to force optype `create`, gh#80073
ActionListener.wrap(r -> { listener.onResponse(lastCheckpoint); }, e -> {
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// if a version conflict occurs a new state has been written between us reading and writing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,16 +682,20 @@ public void putOrUpdateTransformStoredDoc(
IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE
).id(TransformStoredDoc.documentId(storedDoc.getId())).source(source);
if (seqNoPrimaryTermAndIndex != null
&& seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
indexRequest.opType(DocWriteRequest.OpType.INDEX)
.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
if (seqNoPrimaryTermAndIndex != null) {
// if seqNoPrimaryTermAndIndex is set, use optype index even if not on the latest index, because the upgrader
// could have been called, see gh#80073
indexRequest.opType(DocWriteRequest.OpType.INDEX);
// if on the latest index use optimistic concurrency control in addition
if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
}
} else {
// If the index is NOT the latest or we are null, that means we have not created this doc before
// so, it should be a create option without the seqNo and primaryTerm set
// we have not created this doc before or we are called from the upgrader
indexRequest.opType(DocWriteRequest.OpType.CREATE);
}

executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
Expand Down