Skip to content

Commit f717cc3

Browse files
author
Hendrik Muhs
authored
[Transform] Fix issue if upgrade runs right after a rolling cluster upgrade (#80579) (#80601)
do not fail a running transform if upgrader rewrites state inbetween fixes #80073
1 parent 36230e7 commit f717cc3

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,16 @@ private static void updateTransformStateAndGetLastCheckpoint(
224224

225225
long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint();
226226

227+
// if: the state is stored on the latest index, it does not need an update
227228
if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
228229
listener.onResponse(lastCheckpoint);
229230
return;
230231
}
231232

233+
// else: the state is on an old index, update by persisting it to the latest index
232234
transformConfigManager.putOrUpdateTransformStoredDoc(
233235
currentState.v1(),
234-
currentState.v2(),
236+
null, // set seqNoPrimaryTermAndIndex to `null` to force optype `create`, gh#80073
235237
ActionListener.wrap(r -> { listener.onResponse(lastCheckpoint); }, e -> {
236238
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
237239
// if a version conflict occurs a new state has been written between us reading and writing.

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -636,16 +636,20 @@ public void putOrUpdateTransformStoredDoc(
636636
IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).setRefreshPolicy(
637637
WriteRequest.RefreshPolicy.IMMEDIATE
638638
).id(TransformStoredDoc.documentId(storedDoc.getId())).source(source);
639-
if (seqNoPrimaryTermAndIndex != null
640-
&& seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
641-
indexRequest.opType(DocWriteRequest.OpType.INDEX)
642-
.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
643-
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
639+
if (seqNoPrimaryTermAndIndex != null) {
640+
// if seqNoPrimaryTermAndIndex is set, use optype index even if not on the latest index, because the upgrader
641+
// could have been called, see gh#80073
642+
indexRequest.opType(DocWriteRequest.OpType.INDEX);
643+
// if on the latest index use optimistic concurrency control in addition
644+
if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
645+
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
646+
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
647+
}
644648
} else {
645-
// If the index is NOT the latest or we are null, that means we have not created this doc before
646-
// so, it should be a create option without the seqNo and primaryTerm set
649+
// we have not created this doc before or we are called from the upgrader
647650
indexRequest.opType(DocWriteRequest.OpType.CREATE);
648651
}
652+
649653
executeAsyncWithOrigin(
650654
client,
651655
TRANSFORM_ORIGIN,

0 commit comments

Comments
 (0)