Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;

public final class CcrRequests {

private CcrRequests() {}

public static ClusterStateRequest metaDataRequest(String leaderIndex) {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
return clusterStateRequest;
}

public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
return putMappingRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
Expand Down Expand Up @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand All @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
Expand All @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Index followIndex = params.getFollowShardId().getIndex();

final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -21,6 +24,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
Expand All @@ -37,6 +41,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
Expand Down Expand Up @@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data
.get();
return response.getState().metaData();
// We set a single dummy index name to avoid fetching all the index data
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
return clusterState.getState().metaData();
}

@Override
Expand All @@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
String leaderIndex = index.getName();
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices(leaderIndex)
.get();
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();

// Validates whether the leader cluster has been configured properly:
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex);
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();

Expand Down Expand Up @@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v

Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId());
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
String sessionUUID = UUIDs.randomBase64UUID();
Expand All @@ -261,13 +256,30 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
String nodeId = response.getNodeId();
// TODO: Implement file restore
closeSession(remoteClient, nodeId, sessionUUID);
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();

if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
Index followerIndex = followerIndexSettings.getIndex();
assert leaderIndexMetadata.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
leaderIndexMetadata.getMappings().size() + "]";
MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value;
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
}
}

private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -35,13 +38,15 @@
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
// TODO: is completed.
Expand Down Expand Up @@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException {
assertEquals(0, restoreInfo.failedShards());
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";

final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");

// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
// indexing to the leader while the recovery is happening. However, into order to that test mappings
// are updated prior to that work, we index documents in the clear session callback. This will
// ensure a mapping change prior to the final mapping check on the follower side.
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
restoreSourceService.addCloseSessionListener(s -> {
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
});
}

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
RestoreInfo restoreInfo = future.actionGet();

assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(followerIndex);
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
assertEquals(2, followerIndexMetadata.getMappingVersion());

MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
}

private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {
Expand Down