Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ public Pipeline create(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
return providers.get(type).create(factor, nodes);
}

public void shutdown() {
providers.values().forEach(provider -> provider.shutdown());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ public interface PipelineProvider {

Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);

void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,76 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Implements Api for creating ratis pipelines.
*/
public class RatisPipelineProvider implements PipelineProvider {

private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineProvider.class);

private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;

// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;

private final ForkJoinPool.ForkJoinWorkerThreadFactory factory =
(pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.
defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("RATISCREATEPIPELINE" + worker.getPoolIndex());
return worker;
});

private final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelismForPool, factory, null, false);


RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
}


/**
* Create pluggable container placement policy implementation instance.
*
Expand Down Expand Up @@ -133,7 +174,81 @@ public Pipeline create(ReplicationFactor factor,
.build();
}


@Override
public void shutdown() {
forkJoinPool.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also wait for the tasks to finish. We need to use awaitTermination call. We can use timeout of 60 seconds? That is what is used in Scheduler class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done based on arpit's comment, as on an unclean shutdown this terminate abruptly. So we can use shutdownNow(), instead of awaitTermination in normal case too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharatviswa504 I agree. We need to use shutdownNow but we also need to use awaitTermination. shutdownNow would interrupt the running tasks but the running task should handle the interrupt. If the task does not exit on interrupt, it is a better idea to wait for the task to finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

try {
forkJoinPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Unexpected exception occurred during shutdown of " +
"RatisPipelineProvider", e);
}
}

protected void initializePipeline(Pipeline pipeline) throws IOException {
RatisPipelineUtils.createPipeline(pipeline, conf);
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getNodes(),
(raftClient, peer) -> {
RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
if (reply == null || !reply.isSuccess()) {
String msg = "Pipeline initialization failed for pipeline:"
+ pipeline.getId() + " node:" + peer.getId();
LOG.error(msg);
throw new IOException(msg);
}
});
}

private void callRatisRpc(List<DatanodeDetails> datanodes,
CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
throws IOException {
if (datanodes.isEmpty()) {
return;
}

final String rpcType = conf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
final List< IOException > exceptions =
Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(conf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(conf));
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(conf);
try {
forkJoinPool.submit(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please verify that none of the threads are waiting for the parallel stream call to finish?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharatviswa504 Can you please verify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lokeshj1703 Sorry missed this comment earlier.
Checked this, one of the forkJoinPool thread is used for waiting and the same is being used in one of the calls for Ratis with 3 pipeline.

Output:
The below line is from after Submit.
Thread name RATISCREATEPIPELINE1
forkJoinPool.submit(() -> {
These below log lines are inside ParallelStream
datanodes.parallelStream().forEach(d -> {
Internal thread name RATISCREATEPIPELINE1
Internal thread name RATISCREATEPIPELINE3
Internal thread name RATISCREATEPIPELINE2

So, I think we should be fine with parallelism set to 3. I even tried with 4, but I still see the same above output.

datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig,
requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
"Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
LOG.error(errMsg, ioe);
exceptions.add(new IOException(errMsg, ioe));
}
});
}).get();
} catch (ExecutionException | RejectedExecutionException ex) {
LOG.error(ex.getClass().getName() + " exception occurred during " +
"createPipeline", ex);
throw new IOException(ex.getClass().getName() + " exception occurred " +
"during createPipeline", ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Interrupt exception occurred during " +
"createPipeline", ex);
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,37 @@
*/
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Utility class for Ratis pipelines. Contains methods to create and destroy
* ratis pipelines.
*/
final class RatisPipelineUtils {
public final class RatisPipelineUtils {

private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineUtils.class);

private RatisPipelineUtils() {
}

/**
* Sends ratis command to create pipeline on all the datanodes.
*
* @param pipeline - Pipeline to be created
* @param ozoneConf - Ozone Confinuration
* @throws IOException if creation fails
*/
public static void createPipeline(Pipeline pipeline, Configuration ozoneConf)
throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getNodes(), ozoneConf,
(raftClient, peer) -> {
RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
if (reply == null || !reply.isSuccess()) {
String msg = "Pipeline initialization failed for pipeline:"
+ pipeline.getId() + " node:" + peer.getId();
LOG.error(msg);
throw new IOException(msg);
}
});
}

/**
* Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
* the datanodes.
Expand Down Expand Up @@ -125,42 +96,4 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
client
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
}

private static void callRatisRpc(List<DatanodeDetails> datanodes,
Configuration ozoneConf,
CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
throws IOException {
if (datanodes.isEmpty()) {
return;
}

final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final List<IOException> exceptions =
Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
"Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
LOG.error(errMsg, ioe);
exceptions.add(new IOException(errMsg, ioe));
}
});
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
this.lock = new ReentrantReadWriteLock();
this.conf = conf;
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
conf);
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
Expand Down Expand Up @@ -419,5 +420,7 @@ public void close() throws IOException {
if(metrics != null) {
metrics.unRegister();
}
// shutdown pipeline provider.
pipelineFactory.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public Pipeline create(ReplicationFactor factor,
.setNodes(nodes)
.build();
}

@Override
public void shutdown() {
// Do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public MockRatisPipelineProvider(NodeManager nodeManager,
protected void initializePipeline(Pipeline pipeline) throws IOException {
// do nothing as the datanodes do not exists
}

@Override
public void shutdown() {
// Do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.test.GenericTestUtils;
Expand All @@ -40,7 +39,7 @@
/**
* Tests for RatisPipelineUtils.
*/
public class TestRatisPipelineUtils {
public class TestRatisPipelineCreateAndDestory {

private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
Expand Down Expand Up @@ -98,11 +97,13 @@ public void testPipelineCreationOnNodeRestart() throws Exception {

// try creating another pipeline now
try {
RatisPipelineUtils.createPipeline(pipelines.get(0), conf);
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.fail("pipeline creation should fail after shutting down pipeline");
} catch (IOException ioe) {
// in case the pipeline creation fails, MultipleIOException is thrown
Assert.assertTrue(ioe instanceof MultipleIOException);
// As now all datanodes are shutdown, they move to stale state, there
// will be no sufficient datanodes to create the pipeline.
Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
}

// make sure pipelines is destroyed
Expand Down