Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@
*/
package org.apache.hadoop.ozone.client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.response.*;
import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
import org.apache.hadoop.ozone.client.rest.response.KeyLocation;
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftRetryFailureException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** A utility class for OzoneClient. */
public final class OzoneClientUtils {

Expand Down Expand Up @@ -129,14 +136,31 @@ public static KeyInfoDetails asKeyInfoDetails(OzoneKeyDetails key) {

public static RetryPolicy createRetryPolicy(int maxRetryCount,
long retryInterval) {
// just retry without sleep
RetryPolicy retryPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
TimeUnit.MILLISECONDS);
return retryPolicy;
// retry with fixed sleep between retries
return RetryPolicies.retryUpToMaximumCountWithFixedSleep(
maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
}

public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}

public static Map<Class<? extends Throwable>, RetryPolicy>
getRetryPolicyByException(int maxRetryCount, long retryInterval) {
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
for (Class<? extends Exception> ex : EXCEPTION_LIST) {
if (ex == TimeoutException.class ||
ex == RaftRetryFailureException.class) {
// retry without sleep
policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
} else {
// retry with fixed sleep between retries
policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
}
}
// Default retry policy
policyMap.put(Exception.class, createRetryPolicy(
maxRetryCount, retryInterval));
return policyMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
Expand Down Expand Up @@ -52,7 +51,10 @@
import java.util.List;
import java.util.Collection;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Maintaining a list of BlockInputStream. Write based on offset.
Expand Down Expand Up @@ -95,7 +97,7 @@ enum StreamAction {
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private FileEncryptionInfo feInfo;
private ExcludeList excludeList;
private final RetryPolicy retryPolicy;
private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
private int retryCount;
private long offset;
/**
Expand All @@ -121,7 +123,10 @@ public KeyOutputStream() {
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
this.bytesPerChecksum = OzoneConfigKeys
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
this.retryPolicyMap = OzoneClientUtils.getExceptionList()
.stream()
.collect(Collectors.toMap(Function.identity(),
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
retryCount = 0;
offset = 0;
}
Expand Down Expand Up @@ -200,8 +205,8 @@ public KeyOutputStream(OpenKeySession handler,
this.bufferPool =
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
this.excludeList = new ExcludeList();
this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount,
retryInterval);
this.retryPolicyMap = OzoneClientUtils.getRetryPolicyByException(
maxRetryCount, retryInterval);
this.retryCount = 0;
}

Expand Down Expand Up @@ -502,10 +507,14 @@ private void markStreamClosed() {
}

private void handleRetry(IOException exception, long len) throws IOException {
RetryPolicy retryPolicy =
retryPolicyMap.get(checkForException(exception).getClass());
if (retryPolicy == null) {
retryPolicy = retryPolicyMap.get(Exception.class);
}
RetryPolicy.RetryAction action;
try {
action = retryPolicy
.shouldRetry(exception, retryCount, 0, true);
action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
} catch (Exception e) {
throw e instanceof IOException ? (IOException) e : new IOException(e);
}
Expand Down