Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
remoteClusterStateResponse -> {
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex));
return;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
try {
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
checkRemoteClusterLicenseAndFetchClusterState(
Expand Down Expand Up @@ -199,7 +200,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
Expand All @@ -211,7 +212,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState(
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -26,13 +27,11 @@
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
Expand Down Expand Up @@ -66,7 +65,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;

private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;

Expand All @@ -80,11 +78,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {

public AutoFollowCoordinator(
Client client,
ThreadPool threadPool,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
clusterService.addListener(this);
Expand Down Expand Up @@ -150,22 +146,24 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {

@Override
void getRemoteClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) {
final long metadataVersion,
final BiConsumer<ClusterStateResponse, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
request.routingTable(true);
request.waitForMetaDataVersion(metadataVersion);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
remoteCluster,
request,
e -> handler.accept(null, e),
remoteClusterState -> handler.accept(remoteClusterState, null));
remoteClusterStateResponse -> handler.accept(remoteClusterStateResponse, null));
}

@Override
Expand Down Expand Up @@ -239,19 +237,17 @@ public void clusterChanged(ClusterChangedEvent event) {
abstract static class AutoFollower {

private final String remoteCluster;
private final ThreadPool threadPool;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;

private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final ThreadPool threadPool,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
this.remoteCluster = remoteCluster;
this.threadPool = threadPool;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
}
Expand All @@ -276,9 +272,15 @@ void start() {
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size());

getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> {
if (remoteClusterState != null) {
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
if (remoteClusterStateResponse != null) {
assert remoteError == null;
if (remoteClusterStateResponse.isWaitForTimedOut()) {
start();
return;
}
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
metadataVersion = remoteClusterState.metaData().version();
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
} else {
assert remoteError != null;
Expand Down Expand Up @@ -402,8 +404,7 @@ private void finalise(int slot, AutoFollowResult result) {
autoFollowResults.set(slot, result);
if (autoFollowPatternsCountDown.countDown()) {
statsUpdater.accept(autoFollowResults.asList());
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start);
start();
}
}

Expand Down Expand Up @@ -525,13 +526,15 @@ static Function<ClusterState, ClusterState> cleanFollowedRemoteIndices(
}

/**
* Fetch the cluster state from the leader with the specified cluster alias
* Fetch a remote cluster state from with the specified cluster alias
* @param remoteCluster the name of the leader cluster
* @param metadataVersion the last seen metadata version
* @param handler the callback to invoke
*/
abstract void getRemoteClusterState(
String remoteCluster,
BiConsumer<ClusterState, Exception> handler
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler
);

abstract void createAndFollow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Consumer<ClusterState> consumer = remoteClusterState -> {
Consumer<ClusterStateResponse> consumer = remoteClusterState -> {
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
if (e == null) {
Expand All @@ -94,7 +95,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, filteredHeaders, currentState, remoteClusterState);
return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState());
}
});
} else {
Expand Down
Loading