Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public MiniOzoneChaosCluster(OzoneConfiguration conf,

this.executorService = Executors.newSingleThreadScheduledExecutor();
this.numDatanodes = getHddsDatanodes().size();
LOG.info("Starting MiniOzoneChaosCluster with:{} datanodes" + numDatanodes);
LOG.info("Starting MiniOzoneChaosCluster with {} datanodes", numDatanodes);
LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.WARN);
}

Expand Down Expand Up @@ -108,7 +108,7 @@ private void restartNodes() {
LOG.info("{} Completed restarting Datanode: {}", failString,
dn.getUuid());
} catch (Exception e) {
LOG.error("Failed to restartNodes Datanode", dn.getUuid());
LOG.error("Failed to restartNodes Datanode {}", dn.getUuid(), e);
}
}
}
Expand All @@ -119,7 +119,7 @@ private void shutdownNodes() {
for (int i = 0; i < numNodesToFail; i++) {
boolean shouldStop = shouldStop();
int failedNodeIndex = getNodeToFail();
String stopString = shouldStop ? "Stopping" : "Starting";
String stopString = shouldStop ? "Stopping" : "Restarting";
DatanodeDetails dn =
getHddsDatanodes().get(failedNodeIndex).getDatanodeDetails();
try {
Expand All @@ -133,7 +133,7 @@ private void shutdownNodes() {
LOG.info("Completed {} DataNode {}", stopString, dn.getUuid());

} catch (Exception e) {
LOG.error("Failed to shutdown Datanode", dn.getUuid());
LOG.error("Failed {} Datanode {}", stopString, dn.getUuid(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,7 +50,7 @@
*/
public class MiniOzoneLoadGenerator {

static final Logger LOG =
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);

private static String keyNameDelimiter = "_";
Expand Down Expand Up @@ -113,7 +114,7 @@ private void load(long runTimeMillis) {
int index = RandomUtils.nextInt();
String keyName = writeData(index, bucket, threadName);

readData(bucket, keyName);
readData(bucket, keyName, index);

deleteKey(bucket, keyName);
} catch (Exception e) {
Expand All @@ -133,11 +134,13 @@ private String writeData(int keyIndex, OzoneBucket bucket, String threadName)
ByteBuffer buffer = buffers.get(keyIndex % numBuffers);
int bufferCapacity = buffer.capacity();

String keyName = threadName + keyNameDelimiter + keyIndex;
String keyName = getKeyName(keyIndex, threadName);
LOG.trace("LOADGEN: Writing key {}", keyName);
try (OzoneOutputStream stream = bucket.createKey(keyName,
bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
new HashMap<>())) {
stream.write(buffer.array());
LOG.trace("LOADGEN: Written key {}", keyName);
} catch (Throwable t) {
LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
keyName, t);
Expand All @@ -147,9 +150,9 @@ private String writeData(int keyIndex, OzoneBucket bucket, String threadName)
return keyName;
}

private void readData(OzoneBucket bucket, String keyName) throws Exception {
int index = Integer.valueOf(keyName.split(keyNameDelimiter)[1]);

private void readData(OzoneBucket bucket, String keyName, int index)
throws Exception {
LOG.trace("LOADGEN: Reading key {}", keyName);

ByteBuffer buffer = buffers.get(index % numBuffers);
int bufferCapacity = buffer.capacity();
Expand All @@ -168,25 +171,29 @@ private void readData(OzoneBucket bucket, String keyName) throws Exception {
throw new IOException("Read mismatch, key:" + keyName +
" read data does not match the written data");
}
LOG.trace("LOADGEN: Read key {}", keyName);
} catch (Throwable t) {
LOG.error("LOADGEN: Read key:{} failed with exception", keyName, t);
throw t;
}
}

private void deleteKey(OzoneBucket bucket, String keyName) throws Exception {
LOG.trace("LOADGEN: Deleting key {}", keyName);
try {
bucket.deleteKey(keyName);
LOG.trace("LOADGEN: Deleted key {}", keyName);
} catch (Throwable t) {
LOG.error("LOADGEN: Unable to delete key:{}", keyName, t);
throw t;
}
}

private String getKeyToRead() {
private Optional<Integer> randomKeyToRead() {
int currentIndex = agedFileWrittenIndex.get();
return currentIndex != 0 ?
String.valueOf(RandomUtils.nextInt(0, currentIndex)): null;
return currentIndex != 0
? Optional.of(RandomUtils.nextInt(0, currentIndex))
: Optional.empty();
}

private void startAgedFilesLoad(long runTimeMillis) {
Expand All @@ -201,12 +208,13 @@ private void startAgedFilesLoad(long runTimeMillis) {
String keyName = null;
try {
if (agedWriteProbability.isTrue()) {
keyName = writeData(agedFileWrittenIndex.incrementAndGet(),
keyName = writeData(agedFileWrittenIndex.getAndIncrement(),
agedLoadBucket, threadName);
} else {
keyName = getKeyToRead();
if (keyName != null) {
readData(agedLoadBucket, keyName);
Optional<Integer> index = randomKeyToRead();
if (index.isPresent()) {
keyName = getKeyName(index.get(), threadName);
readData(agedLoadBucket, keyName, index.get());
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -253,4 +261,8 @@ public void shutdownLoadGenerator() {
LOG.error("error while closing ", e);
}
}

private static String getKeyName(int keyIndex, String threadName) {
return threadName + keyNameDelimiter + keyIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ log4j.rootLogger=info,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n

log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR