From ebc812c133b2d303c30035a8d8c08309602ff8d7 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 19 Mar 2019 07:58:00 +0100 Subject: [PATCH] Remove Redundant Request Wrappers from RepositoryService --- .../TransportDeleteRepositoryAction.java | 23 +- .../put/TransportPutRepositoryAction.java | 9 +- .../TransportVerifyRepositoryAction.java | 14 +- .../repositories/RepositoriesService.java | 220 +++++------------- .../VerifyNodeRepositoryAction.java | 18 +- 5 files changed, 86 insertions(+), 198 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index 469c14f49bd40..04901cbe256e4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -69,18 +69,17 @@ protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, Clus protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state, final ActionListener listener) { repositoriesService.unregisterRepository( - new RepositoriesService.UnregisterRepositoryRequest("delete_repository [" + request.name() + "]", request.name()) - .masterNodeTimeout(request.masterNodeTimeout()).ackTimeout(request.timeout()), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) { - listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged())); - } + request, + new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) { + listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged())); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index a495ba72f35b7..4a58edf64616b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -68,13 +68,7 @@ protected ClusterBlockException checkBlock(PutRepositoryRequest request, Cluster @Override protected void masterOperation(final PutRepositoryRequest request, ClusterState state, final ActionListener listener) { - - repositoriesService.registerRepository( - new RepositoriesService.RegisterRepositoryRequest("put_repository [" + request.name() + "]", - request.name(), request.type(), request.verify()) - .settings(request.settings()) - .masterNodeTimeout(request.masterNodeTimeout()) - .ackTimeout(request.timeout()), new ActionListener() { + repositoriesService.registerRepository(request, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { @@ -87,5 +81,4 @@ public void onFailure(Exception e) { } }); } - } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java index 19fa4cbde15ca..aa973d4797a77 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/verify/TransportVerifyRepositoryAction.java @@ -26,13 +26,15 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.List; + /** * Transport action for verifying repository operation */ @@ -68,14 +70,10 @@ protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, Clus @Override protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state, final ActionListener listener) { - repositoriesService.verifyRepository(request.name(), new ActionListener() { + repositoriesService.verifyRepository(request.name(), new ActionListener>() { @Override - public void onResponse(RepositoriesService.VerifyResponse verifyResponse) { - if (verifyResponse.failed()) { - listener.onFailure(new RepositoryVerificationException(request.name(), verifyResponse.failureDescription())); - } else { - listener.onResponse(new VerifyRepositoryResponse(verifyResponse.nodes())); - } + public void onResponse(List verifyResponse) { + listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 033080c2c38e6..eb130f31e4b9d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -23,11 +23,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoriesMetaData; @@ -43,12 +44,10 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * Service responsible for maintaining and providing access to snapshot repositories on nodes. @@ -93,12 +92,12 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra * @param request register repository request * @param listener register repository listener */ - public void registerRepository(final RegisterRepositoryRequest request, final ActionListener listener) { - final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name, request.type, request.settings); + public void registerRepository(final PutRepositoryRequest request, final ActionListener listener) { + final RepositoryMetaData newRepositoryMetaData = new RepositoryMetaData(request.name(), request.type(), request.settings()); final ActionListener registrationListener; - if (request.verify) { - registrationListener = new VerifyingRegisterRepositoryListener(request.name, listener); + if (request.verify()) { + registrationListener = new VerifyingRegisterRepositoryListener(request.name(), listener); } else { registrationListener = listener; } @@ -111,7 +110,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac return; } - clusterService.submitStateUpdateTask(request.cause, + clusterService.submitStateUpdateTask("put_repository [" + request.name() + "]", new AckedClusterStateUpdateTask(request, registrationListener) { @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { @@ -120,14 +119,14 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) { - ensureRepositoryNotInUse(currentState, request.name); + ensureRepositoryNotInUse(currentState, request.name()); MetaData metaData = currentState.metaData(); MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE); if (repositories == null) { - logger.info("put repository [{}]", request.name); + logger.info("put repository [{}]", request.name()); repositories = new RepositoriesMetaData( - Collections.singletonList(new RepositoryMetaData(request.name, request.type, request.settings))); + Collections.singletonList(new RepositoryMetaData(request.name(), request.type(), request.settings()))); } else { boolean found = false; List repositoriesMetaData = new ArrayList<>(repositories.repositories().size() + 1); @@ -145,10 +144,10 @@ public ClusterState execute(ClusterState currentState) { } } if (!found) { - logger.info("put repository [{}]", request.name); - repositoriesMetaData.add(new RepositoryMetaData(request.name, request.type, request.settings)); + logger.info("put repository [{}]", request.name()); + repositoriesMetaData.add(new RepositoryMetaData(request.name(), request.type(), request.settings())); } else { - logger.info("update repository [{}]", request.name); + logger.info("update repository [{}]", request.name()); } repositories = new RepositoriesMetaData(repositoriesMetaData); } @@ -158,7 +157,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name), e); + logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name()), e); super.onFailure(source, e); } @@ -177,51 +176,52 @@ public boolean mustAck(DiscoveryNode discoveryNode) { * @param request unregister repository request * @param listener unregister repository listener */ - public void unregisterRepository(final UnregisterRepositoryRequest request, final ActionListener listener) { - clusterService.submitStateUpdateTask(request.cause, new AckedClusterStateUpdateTask(request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("delete_repository [" + request.name() + "]", + new AckedClusterStateUpdateTask(request, listener) { + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } - @Override - public ClusterState execute(ClusterState currentState) { - ensureRepositoryNotInUse(currentState, request.name); - MetaData metaData = currentState.metaData(); - MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE); - if (repositories != null && repositories.repositories().size() > 0) { - List repositoriesMetaData = new ArrayList<>(repositories.repositories().size()); - boolean changed = false; - for (RepositoryMetaData repositoryMetaData : repositories.repositories()) { - if (Regex.simpleMatch(request.name, repositoryMetaData.name())) { - logger.info("delete repository [{}]", repositoryMetaData.name()); - changed = true; - } else { - repositoriesMetaData.add(repositoryMetaData); + @Override + public ClusterState execute(ClusterState currentState) { + ensureRepositoryNotInUse(currentState, request.name()); + MetaData metaData = currentState.metaData(); + MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); + RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE); + if (repositories != null && repositories.repositories().size() > 0) { + List repositoriesMetaData = new ArrayList<>(repositories.repositories().size()); + boolean changed = false; + for (RepositoryMetaData repositoryMetaData : repositories.repositories()) { + if (Regex.simpleMatch(request.name(), repositoryMetaData.name())) { + logger.info("delete repository [{}]", repositoryMetaData.name()); + changed = true; + } else { + repositoriesMetaData.add(repositoryMetaData); + } + } + if (changed) { + repositories = new RepositoriesMetaData(repositoriesMetaData); + mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories); + return ClusterState.builder(currentState).metaData(mdBuilder).build(); } } - if (changed) { - repositories = new RepositoriesMetaData(repositoriesMetaData); - mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories); - return ClusterState.builder(currentState).metaData(mdBuilder).build(); + if (Regex.isMatchAllPattern(request.name())) { // we use a wildcard so we don't barf if it's not present. + return currentState; } + throw new RepositoryMissingException(request.name()); } - if (Regex.isMatchAllPattern(request.name)) { // we use a wildcard so we don't barf if it's not present. - return currentState; - } - throw new RepositoryMissingException(request.name); - } - @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - // repository was created on both master and data nodes - return discoveryNode.isMasterNode() || discoveryNode.isDataNode(); - } - }); + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + // repository was created on both master and data nodes + return discoveryNode.isMasterNode() || discoveryNode.isDataNode(); + } + }); } - public void verifyRepository(final String repositoryName, final ActionListener listener) { + public void verifyRepository(final String repositoryName, final ActionListener> listener) { final Repository repository = repository(repositoryName); try { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { @@ -229,9 +229,9 @@ public void verifyRepository(final String repositoryName, final ActionListener() { + verifyAction.verify(repositoryName, verificationToken, new ActionListener>() { @Override - public void onResponse(VerifyResponse verifyResponse) { + public void onResponse(List verifyResponse) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { repository.endVerification(verificationToken); @@ -263,7 +263,7 @@ public void onFailure(Exception e) { }); } } else { - listener.onResponse(new VerifyResponse(new DiscoveryNode[0], new VerificationFailure[0])); + listener.onResponse(Collections.emptyList()); } } catch (Exception e) { listener.onFailure(e); @@ -440,14 +440,10 @@ private class VerifyingRegisterRepositoryListener implements ActionListener() { + verifyRepository(name, new ActionListener>() { @Override - public void onResponse(VerifyResponse verifyResponse) { - if (verifyResponse.failed()) { - listener.onFailure(new RepositoryVerificationException(name, verifyResponse.failureDescription())); - } else { - listener.onResponse(clusterStateUpdateResponse); - } + public void onResponse(List verifyResponse) { + listener.onResponse(clusterStateUpdateResponse); } @Override @@ -465,104 +461,4 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - - /** - * Register repository request - */ - public static class RegisterRepositoryRequest extends ClusterStateUpdateRequest { - - final String cause; - - final String name; - - final String type; - - final boolean verify; - - Settings settings = Settings.EMPTY; - - /** - * Constructs new register repository request - * - * @param cause repository registration cause - * @param name repository name - * @param type repository type - * @param verify verify repository after creation - */ - public RegisterRepositoryRequest(String cause, String name, String type, boolean verify) { - this.cause = cause; - this.name = name; - this.type = type; - this.verify = verify; - } - - /** - * Sets repository settings - * - * @param settings repository settings - * @return this request - */ - public RegisterRepositoryRequest settings(Settings settings) { - this.settings = settings; - return this; - } - } - - /** - * Unregister repository request - */ - public static class UnregisterRepositoryRequest extends ClusterStateUpdateRequest { - - final String cause; - - final String name; - - /** - * Creates a new unregister repository request - * - * @param cause repository unregistration cause - * @param name repository name - */ - public UnregisterRepositoryRequest(String cause, String name) { - this.cause = cause; - this.name = name; - } - - } - - /** - * Verify repository request - */ - public static class VerifyResponse { - - private VerificationFailure[] failures; - - private DiscoveryNode[] nodes; - - public VerifyResponse(DiscoveryNode[] nodes, VerificationFailure[] failures) { - this.nodes = nodes; - this.failures = failures; - } - - public VerificationFailure[] failures() { - return failures; - } - - public DiscoveryNode[] nodes() { - return nodes; - } - - public boolean failed() { - return failures.length > 0; - } - - public String failureDescription() { - return Arrays - .stream(failures) - .map(failure -> failure.toString()) - .collect(Collectors.joining(", ", "[", "]")); - } - - } - } diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index 142751a0f8cfc..24a5d3b561dda 100644 --- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.repositories.RepositoriesService.VerifyResponse; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -68,7 +67,7 @@ public VerifyNodeRepositoryAction(TransportService transportService, ClusterServ new VerifyNodeRepositoryRequestHandler()); } - public void verify(String repository, String verificationToken, final ActionListener listener) { + public void verify(String repository, String verificationToken, final ActionListener> listener) { final DiscoveryNodes discoNodes = clusterService.state().nodes(); final DiscoveryNode localNode = discoNodes.getLocalNode(); @@ -89,7 +88,7 @@ public void verify(String repository, String verificationToken, final ActionList errors.add(new VerificationFailure(node.getId(), e)); } if (counter.decrementAndGet() == 0) { - finishVerification(listener, nodes, errors); + finishVerification(repository, listener, nodes, errors); } } else { transportService.sendRequest(node, ACTION_NAME, new VerifyNodeRepositoryRequest(repository, verificationToken), @@ -97,7 +96,7 @@ public void verify(String repository, String verificationToken, final ActionList @Override public void handleResponse(TransportResponse.Empty response) { if (counter.decrementAndGet() == 0) { - finishVerification(listener, nodes, errors); + finishVerification(repository, listener, nodes, errors); } } @@ -105,7 +104,7 @@ public void handleResponse(TransportResponse.Empty response) { public void handleException(TransportException exp) { errors.add(new VerificationFailure(node.getId(), exp)); if (counter.decrementAndGet() == 0) { - finishVerification(listener, nodes, errors); + finishVerification(repository, listener, nodes, errors); } } }); @@ -113,10 +112,13 @@ public void handleException(TransportException exp) { } } - public void finishVerification(ActionListener listener, List nodes, + private static void finishVerification(String repositoryName, ActionListener> listener, List nodes, CopyOnWriteArrayList errors) { - listener.onResponse(new RepositoriesService.VerifyResponse(nodes.toArray(new DiscoveryNode[nodes.size()]), - errors.toArray(new VerificationFailure[errors.size()]))); + if (errors.isEmpty() == false) { + listener.onFailure(new RepositoryVerificationException(repositoryName, errors.toString())); + } else { + listener.onResponse(nodes); + } } private void doVerify(String repositoryName, String verificationToken, DiscoveryNode localNode) {