Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,54 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.util.Time;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;

import org.apache.hadoop.util.Time;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;

/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
Expand Down Expand Up @@ -309,10 +311,7 @@ public XceiverClientReply sendCommandAsync(
Time.monotonicNowNanos() - requestTime);
}).thenApply(reply -> {
try {
// we need to handle RaftRetryFailure Exception
RaftRetryFailureException raftRetryFailureException =
reply.getRetryFailureException();
if (raftRetryFailureException != null) {
if (!reply.isSuccess()) {
// in case of raft retry failure, the raft client is
// not able to connect to the leader hence the pipeline
// can not be used but this instance of RaftClient will close
Expand All @@ -324,7 +323,10 @@ public XceiverClientReply sendCommandAsync(
// to SCM as in this case, it is the raft client which is not
// able to connect to leader in the pipeline, though the
// pipeline can still be functional.
throw new CompletionException(raftRetryFailureException);
RaftException exception = reply.getException();
Preconditions.checkNotNull(exception, "Raft reply failure but " +
"no exception propagated.");
throw new CompletionException(exception);
}
ContainerCommandResponseProto response =
ContainerCommandResponseProto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public final class ScmConfigKeys {
"dfs.container.ratis.log.appender.queue.byte-limit";
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
"dfs.container.ratis.log.purge.gap";
// TODO: Set to 1024 once RATIS issue around purge is fixed.
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
1000000000;
// expiry interval stateMachineData cache entry inside containerStateMachine
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
Expand Down Expand Up @@ -146,7 +151,7 @@ public final class ScmConfigKeys {

public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"dfs.ratis.snapshot.threshold";
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000;
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;

public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ public final class OzoneConfigKeys {
public static final String
DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT;
public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP;
public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT;
public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY;
public static final TimeDuration
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@
<description>Byte limit for ratis leader's log appender queue.
</description>
</property>
<property>
<name>dfs.container.ratis.log.purge.gap</name>
<value>1000000000</value>
<tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
<description>Purge gap between the last purged commit index
and the current index, when the leader decides to purge its log.
</description>
</property>
<property>
<name>dfs.container.ratis.datanode.storage.dir</name>
<value/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
Expand Down Expand Up @@ -195,12 +195,12 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
throws IOException {
if (snapshot == null) {
TermIndex empty =
TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
LOG.info(
"The snapshot info is null." + "Setting the last applied index to:"
+ empty);
setLastAppliedTermIndex(empty);
return RaftServerConstants.INVALID_LOG_INDEX;
return RaftLog.INVALID_LOG_INDEX;
}

final File snapshotFile = snapshot.getFile().getPath().toFile();
Expand Down Expand Up @@ -243,7 +243,7 @@ public void persistContainerSet(OutputStream out) throws IOException {
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
LOG.info("Taking snapshot at termIndex:" + ti);
if (ti != null && ti.getIndex() != RaftServerConstants.INVALID_LOG_INDEX) {
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("Taking a snapshot to file {}", snapshotFile);
Expand Down Expand Up @@ -651,14 +651,13 @@ private void evictStateMachineCache() {
}

@Override
public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(group, roleInfoProto);
public void notifySlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
}

@Override
public void notifyExtendedNoLeader(RaftGroup group,
RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(group, roleInfoProto);
public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(gid, roleInfoProto);
}

@Override
Expand All @@ -667,6 +666,16 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
evictStateMachineCache();
}

@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
firstTermIndexInLog);
final CompletableFuture<TermIndex> future = new CompletableFuture<>();
future.complete(firstTermIndexInLog);
return future;
}

@Override
public void close() throws IOException {
evictStateMachineCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
Expand All @@ -66,6 +65,7 @@
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -240,8 +240,9 @@ private RaftProperties newRaftProperties(Configuration conf) {
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements);
RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit);
RaftServerConfigKeys.Log.setQueueElementLimit(
properties, logQueueNumElements);
RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit);

int numSyncRetries = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
Expand All @@ -251,8 +252,17 @@ private RaftProperties newRaftProperties(Configuration conf) {
numSyncRetries);

// Enable the StateMachineCaching
RaftServerConfigKeys.Log.StateMachineData
.setCachingEnabled(properties, true);
RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled(
properties, true);

RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
false);

int purgeGap = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP,
OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT);
RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap);

return properties;
}

Expand Down Expand Up @@ -590,11 +600,32 @@ public List<PipelineID> getPipelineIds() {
return pipelineIDs;
}

void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto);
void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
}

void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
handlePipelineFailure(group.getGroupId(), roleInfoProto);
void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
}

/**
* The fact that the snapshot contents cannot be used to actually catch up
* the follower, it is the reason to initiate close pipeline and
* not install the snapshot. The follower will basically never be able to
* catch up.
*
* @param groupId raft group information
* @param roleInfoProto information about the current node role and
* rpc delay information.
* @param firstTermIndexInLog After the snapshot installation is complete,
* return the last included term index in the snapshot.
*/
void handleInstallSnapshotFromLeader(RaftGroupId groupId,
RoleInfoProto roleInfoProto,
TermIndex firstTermIndexInLog) {
LOG.warn("Install snapshot notification received from Leader with " +
"termIndex: {}, terminating pipeline: {}",
firstTermIndexInLog, groupId);
handlePipelineFailure(groupId, roleInfoProto);
}
}
}
2 changes: 1 addition & 1 deletion hadoop-hdds/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.5.0-SNAPSHOT</hdds.version>

<!-- Apache Ratis version -->
<ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
<ratis.version>0.4.0-2337318-SNAPSHOT</ratis.version>

<bouncycastle.version>1.60</bouncycastle.version>

Expand Down
Loading