Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -247,28 +246,31 @@ protected Boolean newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData currentIndex = currentState.metaData().index(request.getFollowRequest().getFollowIndex());
String followIndex = request.getFollowRequest().getFollowIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
}

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(request.getFollowRequest().getFollowIndex());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);

// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());
// Overwriting UUID here, because otherwise we can't follow indices in the same cluster
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowIndex());
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex);
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
imdBuilder.settings(settingsBuilder);

// Copy mappings from leader IMD to follow IMD
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
imdBuilder.putMapping(cursor.value);
}
mdBuilder.put(imdBuilder.build(), false);
imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards());
IndexMetaData followIMD = imdBuilder.build();
mdBuilder.put(followIMD, false);

ClusterState.Builder builder = ClusterState.builder(currentState);
builder.metaData(mdBuilder.build());
Expand All @@ -279,7 +281,10 @@ public ClusterState execute(ClusterState currentState) throws Exception {
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowIndex() + "] created");


logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());

return updatedState;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
assertThat(operation.id(), equalTo("5"));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30227")
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
Expand All @@ -162,6 +161,7 @@ public void testFollowIndex() throws Exception {
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

final int firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
Expand All @@ -185,6 +185,7 @@ public void testFollowIndex() throws Exception {
unfollowIndex("index2");
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
Expand Down Expand Up @@ -408,7 +409,7 @@ private void unfollowIndex(String index) throws Exception {
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return () -> {
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
assertTrue("doc with id [" + value + "] does not exist", getResponse.isExists());
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("f")));
assertThat(getResponse.getSource().get("f"), equalTo(value));
};
Expand Down