Skip to content

Commit 0ed62db

Browse files
dnhatnTim-Brooks
authored andcommitted
Tighten mapping syncing in ccr remote restore (elastic#38071)
There are two issues regarding the way that we sync mapping from leader to follower when a ccr restore is completed: 1. The returned mapping from a cluster service might not be up to date as the mapping of the restored index commit. 2. We should not compare the mapping version of the follower and the leader. They are not related to one another. Moreover, I think we should only ensure that once the restore is done, the mapping on the follower should be at least the mapping of the copied index commit. We don't have to sync the mapping which is updated after we have opened a session. Relates elastic#36879 Closes elastic#37887
1 parent 99192e7 commit 0ed62db

File tree

5 files changed

+81
-72
lines changed

5 files changed

+81
-72
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@
66
package org.elasticsearch.xpack.ccr.action;
77

88
import org.elasticsearch.ElasticsearchStatusException;
9+
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1011
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
1112
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
13+
import org.elasticsearch.client.Client;
1214
import org.elasticsearch.cluster.metadata.IndexMetaData;
1315
import org.elasticsearch.cluster.metadata.MappingMetaData;
16+
import org.elasticsearch.cluster.metadata.MetaData;
17+
import org.elasticsearch.common.unit.TimeValue;
1418
import org.elasticsearch.common.xcontent.XContentType;
1519
import org.elasticsearch.index.Index;
1620
import org.elasticsearch.rest.RestStatus;
1721
import org.elasticsearch.xpack.ccr.CcrSettings;
1822

1923
import java.util.Arrays;
2024
import java.util.List;
25+
import java.util.function.Supplier;
2126
import java.util.stream.Collectors;
2227

2328
public final class CcrRequests {
@@ -40,6 +45,39 @@ public static PutMappingRequest putMappingRequest(String followerIndex, MappingM
4045
return putMappingRequest;
4146
}
4247

48+
/**
49+
* Gets an {@link IndexMetaData} of the given index. The mapping version and metadata version of the returned {@link IndexMetaData}
50+
* must be at least the provided {@code mappingVersion} and {@code metadataVersion} respectively.
51+
*/
52+
public static void getIndexMetadata(Client client, Index index, long mappingVersion, long metadataVersion,
53+
Supplier<TimeValue> timeoutSupplier, ActionListener<IndexMetaData> listener) {
54+
final ClusterStateRequest request = CcrRequests.metaDataRequest(index.getName());
55+
if (metadataVersion > 0) {
56+
request.waitForMetaDataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get());
57+
}
58+
client.admin().cluster().state(request, ActionListener.wrap(
59+
response -> {
60+
if (response.getState() == null) {
61+
assert metadataVersion > 0 : metadataVersion;
62+
throw new IllegalStateException("timeout to get cluster state with" +
63+
" metadata version [" + metadataVersion + "], mapping version [" + mappingVersion + "]");
64+
}
65+
final MetaData metaData = response.getState().metaData();
66+
final IndexMetaData indexMetaData = metaData.getIndexSafe(index);
67+
if (indexMetaData.getMappingVersion() >= mappingVersion) {
68+
listener.onResponse(indexMetaData);
69+
return;
70+
}
71+
if (timeoutSupplier.get().nanos() < 0) {
72+
throw new IllegalStateException("timeout to get cluster state with mapping version [" + mappingVersion + "]");
73+
}
74+
// ask for the next version.
75+
getIndexMetadata(client, index, mappingVersion, metaData.version() + 1, timeoutSupplier, listener);
76+
},
77+
listener::onFailure
78+
));
79+
}
80+
4381
public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> {
4482
if (request.origin() == null) {
4583
return null; // a put-mapping-request on old versions does not have origin.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.metadata.IndexMetaData;
2626
import org.elasticsearch.cluster.metadata.MappingMetaData;
27-
import org.elasticsearch.cluster.metadata.MetaData;
2827
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2928
import org.elasticsearch.cluster.service.ClusterService;
3029
import org.elasticsearch.common.CheckedConsumer;
@@ -60,6 +59,7 @@
6059
import java.util.function.BiConsumer;
6160
import java.util.function.Consumer;
6261
import java.util.function.LongConsumer;
62+
import java.util.function.Supplier;
6363

6464
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
6565
import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.extractLeaderShardHistoryUUIDs;
@@ -121,7 +121,9 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
121121
@Override
122122
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
123123
final Index followerIndex = params.getFollowShardId().getIndex();
124-
getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap(
124+
final Index leaderIndex = params.getLeaderShardId().getIndex();
125+
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
126+
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
125127
indexMetaData -> {
126128
if (indexMetaData.getMappings().isEmpty()) {
127129
assert indexMetaData.getMappingVersion() == 1;
@@ -256,39 +258,6 @@ private Client remoteClient(ShardFollowTask params) {
256258
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
257259
}
258260

259-
private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion,
260-
ShardFollowTask params, ActionListener<IndexMetaData> listener) {
261-
final Index leaderIndex = params.getLeaderShardId().getIndex();
262-
final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
263-
if (minRequiredMetadataVersion > 0) {
264-
clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut);
265-
}
266-
try {
267-
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(
268-
r -> {
269-
// if wait_for_metadata_version timeout, the response is empty
270-
if (r.getState() == null) {
271-
assert minRequiredMetadataVersion > 0;
272-
getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener);
273-
return;
274-
}
275-
final MetaData metaData = r.getState().metaData();
276-
final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex);
277-
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
278-
// ask for the next version.
279-
getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener);
280-
} else {
281-
assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion;
282-
listener.onResponse(indexMetaData);
283-
}
284-
},
285-
listener::onFailure
286-
));
287-
} catch (Exception e) {
288-
listener.onFailure(e);
289-
}
290-
}
291-
292261
interface FollowerStatsInfoHandler {
293262
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
294263
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques
8181
throw new ShardNotFoundException(shardId);
8282
}
8383
Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
84-
return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData);
84+
long mappingVersion = indexShard.indexSettings().getIndexMetaData().getMappingVersion();
85+
return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData, mappingVersion);
8586
}
8687

8788
@Override
@@ -106,33 +107,38 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse {
106107

107108
private DiscoveryNode node;
108109
private Store.MetadataSnapshot storeFileMetaData;
110+
private long mappingVersion;
109111

110112
PutCcrRestoreSessionResponse() {
111113
}
112114

113-
PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) {
115+
PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData, long mappingVersion) {
114116
this.node = node;
115117
this.storeFileMetaData = storeFileMetaData;
118+
this.mappingVersion = mappingVersion;
116119
}
117120

118121
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
119122
super(in);
120123
node = new DiscoveryNode(in);
121124
storeFileMetaData = new Store.MetadataSnapshot(in);
125+
mappingVersion = in.readVLong();
122126
}
123127

124128
@Override
125129
public void readFrom(StreamInput in) throws IOException {
126130
super.readFrom(in);
127131
node = new DiscoveryNode(in);
128132
storeFileMetaData = new Store.MetadataSnapshot(in);
133+
mappingVersion = in.readVLong();
129134
}
130135

131136
@Override
132137
public void writeTo(StreamOutput out) throws IOException {
133138
super.writeTo(out);
134139
node.writeTo(out);
135140
storeFileMetaData.writeTo(out);
141+
out.writeVLong(mappingVersion);
136142
}
137143

138144
public DiscoveryNode getNode() {
@@ -142,5 +148,9 @@ public DiscoveryNode getNode() {
142148
public Store.MetadataSnapshot getStoreFileMetaData() {
143149
return storeFileMetaData;
144150
}
151+
152+
public long getMappingVersion() {
153+
return mappingVersion;
154+
}
145155
}
146156
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.elasticsearch.common.metrics.CounterMetric;
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.common.unit.ByteSizeValue;
32+
import org.elasticsearch.common.unit.TimeValue;
3233
import org.elasticsearch.common.util.CombinedRateLimiter;
3334
import org.elasticsearch.index.Index;
34-
import org.elasticsearch.index.IndexSettings;
3535
import org.elasticsearch.index.engine.EngineException;
3636
import org.elasticsearch.index.shard.IndexShard;
3737
import org.elasticsearch.index.shard.IndexShardRecoveryException;
@@ -72,6 +72,8 @@
7272
import java.util.Map;
7373
import java.util.Set;
7474
import java.util.function.LongConsumer;
75+
import java.util.function.Supplier;
76+
7577

7678
/**
7779
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
@@ -288,32 +290,33 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
288290
String name = metadata.name();
289291
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
290292
restoreSession.restoreFiles();
293+
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index());
291294
} catch (Exception e) {
292295
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
293296
}
294-
295-
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
296297
}
297298

298299
@Override
299300
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
300301
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
301302
}
302303

303-
private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
304-
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
305-
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
306-
.actionGet(ccrSettings.getRecoveryActionTimeout());
307-
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
308-
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();
309-
310-
if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
311-
Index followerIndex = followerIndexSettings.getIndex();
312-
assert leaderIndexMetadata.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
313-
leaderIndexMetadata.getMappings().size() + "]";
314-
MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value;
315-
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
316-
localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
304+
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
305+
Client followerClient, Index followerIndex) {
306+
final PlainActionFuture<IndexMetaData> indexMetadataFuture = new PlainActionFuture<>();
307+
final long startTimeInNanos = System.nanoTime();
308+
final Supplier<TimeValue> timeout = () -> {
309+
final long elapsedInNanos = System.nanoTime() - startTimeInNanos;
310+
return TimeValue.timeValueNanos(ccrSettings.getRecoveryActionTimeout().nanos() - elapsedInNanos);
311+
};
312+
CcrRequests.getIndexMetadata(leaderClient, leaderIndex, leaderMappingVersion, 0L, timeout, indexMetadataFuture);
313+
final IndexMetaData leaderIndexMetadata = indexMetadataFuture.actionGet(ccrSettings.getRecoveryActionTimeout());
314+
assert leaderIndexMetadata.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
315+
leaderIndexMetadata.getMappings().size() + "]";
316+
MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value;
317+
if (mappingMetaData != null) {
318+
final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
319+
followerClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
317320
}
318321
}
319322

@@ -323,7 +326,7 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S
323326
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
324327
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
325328
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
326-
response.getStoreFileMetaData(), ccrSettings, throttledTime::inc);
329+
response.getStoreFileMetaData(), response.getMappingVersion(), ccrSettings, throttledTime::inc);
327330
}
328331

329332
private static class RestoreSession extends FileRestoreContext implements Closeable {
@@ -334,17 +337,19 @@ private static class RestoreSession extends FileRestoreContext implements Closea
334337
private final String sessionUUID;
335338
private final DiscoveryNode node;
336339
private final Store.MetadataSnapshot sourceMetaData;
340+
private final long mappingVersion;
337341
private final CcrSettings ccrSettings;
338342
private final LongConsumer throttleListener;
339343

340344
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
341-
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings,
342-
LongConsumer throttleListener) {
345+
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
346+
CcrSettings ccrSettings, LongConsumer throttleListener) {
343347
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
344348
this.remoteClient = remoteClient;
345349
this.sessionUUID = sessionUUID;
346350
this.node = node;
347351
this.sourceMetaData = sourceMetaData;
352+
this.mappingVersion = mappingVersion;
348353
this.ccrSettings = ccrSettings;
349354
this.throttleListener = throttleListener;
350355
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,6 @@ public void testIndividualActionsTimeout() throws Exception {
390390
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
391391
}
392392

393-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887")
394393
public void testFollowerMappingIsUpdated() throws IOException {
395394
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
396395
String leaderIndex = "index1";
@@ -413,16 +412,8 @@ public void testFollowerMappingIsUpdated() throws IOException {
413412
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
414413
.indexSettings(settingsBuilder);
415414

416-
// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
417-
// indexing to the leader while the recovery is happening. However, into order to that test mappings
418-
// are updated prior to that work, we index documents in the clear session callback. This will
419-
// ensure a mapping change prior to the final mapping check on the follower side.
420-
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
421-
restoreSourceService.addCloseSessionListener(s -> {
422-
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
423-
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
424-
});
425-
}
415+
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
416+
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
426417

427418
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
428419
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
@@ -435,10 +426,6 @@ public void testFollowerMappingIsUpdated() throws IOException {
435426
clusterStateRequest.clear();
436427
clusterStateRequest.metaData(true);
437428
clusterStateRequest.indices(followerIndex);
438-
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
439-
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
440-
assertEquals(2, followerIndexMetadata.getMappingVersion());
441-
442429
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
443430
.get("index2").get("doc");
444431
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));

0 commit comments

Comments
 (0)