Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ private HddsConfigKeys() {
public static final String HDDS_SECURITY_CLIENT_SCM_CERTIFICATE_PROTOCOL_ACL =
"hdds.security.client.scm.certificate.protocol.acl";

// Determines if the Container Chunk Manager will write user data to disk
// Set to false only for specific performance tests
public static final String HDDS_CONTAINER_PERSISTDATA =
"hdds.container.chunk.persistdata";
public static final boolean HDDS_CONTAINER_PERSISTDATA_DEFAULT = true;

public static final String HDDS_DATANODE_HTTP_ENABLED_KEY =
"hdds.datanode.http.enabled";
public static final String HDDS_DATANODE_HTTP_BIND_HOST_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
Expand Down Expand Up @@ -114,7 +114,7 @@ public KeyValueHandler(Configuration config, StateContext context,
doSyncWrite =
conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY,
OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT);
chunkManager = new ChunkManagerImpl(doSyncWrite);
chunkManager = ChunkManagerFactory.getChunkManager(config, doSyncWrite);
volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass(
HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
.class, VolumeChoosingPolicy.class), conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.keyvalue.impl;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;

/**
* Implementation of ChunkManager built for running performance tests.
* Chunks are not written to disk, Reads are returned with zero-filled buffers
*/
public class ChunkManagerDummyImpl extends ChunkManagerImpl {
static final Logger LOG = LoggerFactory.getLogger(
ChunkManagerDummyImpl.class);

public ChunkManagerDummyImpl(boolean sync) {
super(sync);
}

/**
* writes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - ChunkInfo
* @param data - data of the chunk
* @param dispatcherContext - dispatcherContextInfo
* @throws StorageContainerException
*/
@Override
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ByteBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException {
long writeTimeStart = Time.monotonicNow();

Preconditions.checkNotNull(dispatcherContext);
DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage();

Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);

try {
KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData();
HddsVolume volume = containerData.getVolume();
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
int bufferSize;

switch (stage) {
case WRITE_DATA:
bufferSize = data.capacity();
if (bufferSize != info.getLen()) {
String err = String.format("data array does not match the length "
+ "specified. DataLen: %d Byte Array: %d",
info.getLen(), bufferSize);
log.error(err);
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
}

// Increment volumeIO stats here.
volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
volumeIOStats.incWriteOpCount();
volumeIOStats.incWriteBytes(info.getLen());
break;
case COMMIT_DATA:
updateContainerWriteStats(container, info, false);
break;
case COMBINED:
updateContainerWriteStats(container, info, false);
break;
default:
throw new IOException("Can not identify write operation.");
}
} catch (IOException ex) {
LOG.error("write data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ", ex,
CONTAINER_INTERNAL_ERROR);
}
}

/**
* return a zero-filled buffer.
*
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @param dispatcherContext dispatcher context info.
* @return byte array
* TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone.
*/
@Override
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
DispatcherContext dispatcherContext) {

long readStartTime = Time.monotonicNow();

KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
ByteBuffer data;
HddsVolume volume = containerData.getVolume();
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();

data = ByteBuffer.allocate((int) info.getLen());

// Increment volumeIO stats here.
volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
volumeIOStats.incReadOpCount();
volumeIOStats.incReadBytes(info.getLen());

return data.array();
}

/**
* Delete a given chunk - Do nothing except stats.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - Chunk Info
*/
@Override
public void deleteChunk(Container container, BlockID blockID,
ChunkInfo info) {
Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData();

if (info.getOffset() == 0) {
containerData.decrBytesUsed(info.getLen());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.keyvalue.impl;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA_DEFAULT;

/**
* Select an appropriate ChunkManager implementation as per config setting.
* Ozone ChunkManager is a Singleton
*/
public final class ChunkManagerFactory {
static final Logger LOG = LoggerFactory.getLogger(ChunkManagerFactory.class);

private static volatile ChunkManager instance = null;
private static boolean syncChunks = false;

private ChunkManagerFactory() {
}

public static ChunkManager getChunkManager(Configuration config,
boolean sync) {
if (instance == null) {
synchronized (ChunkManagerFactory.class) {
if (instance == null) {
instance = createChunkManager(config, sync);
syncChunks = sync;
}
}
}

Preconditions.checkArgument((syncChunks == sync),
"value of sync conflicts with previous invocation");
return instance;
}

private static ChunkManager createChunkManager(Configuration config,
boolean sync) {
ChunkManager manager = null;
boolean persist = config.getBoolean(HDDS_CONTAINER_PERSISTDATA,
HDDS_CONTAINER_PERSISTDATA_DEFAULT);

if (!persist) {
boolean scrubber = config.getBoolean(
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED,
HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED_DEFAULT);
if (scrubber) {
// Data Scrubber needs to be disabled for non-persistent chunks.
LOG.warn("Failed to set " + HDDS_CONTAINER_PERSISTDATA + " to false."
+ " Please set " + HddsConfigKeys.HDDS_CONTAINERSCRUB_ENABLED
+ " also to false to enable non-persistent containers.");
persist = true;
}
}

if (persist) {
manager = new ChunkManagerImpl(sync);
} else {
LOG.warn(HDDS_CONTAINER_PERSISTDATA
+ " is set to false. This should be used only for testing."
+ " All user data will be discarded.");
manager = new ChunkManagerDummyImpl(sync);
}

return manager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,12 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
// the same term and log index appended as the current transaction
commitChunk(tmpChunkFile, chunkFile);
// Increment container stats here, as we commit the data.
containerData.incrBytesUsed(info.getLen());
containerData.incrWriteCount();
containerData.incrWriteBytes(info.getLen());
updateContainerWriteStats(container, info, isOverwrite);
break;
case COMBINED:
// directly write to the chunk file
ChunkUtils.writeData(chunkFile, info, data, volumeIOStats, doSyncWrite);
if (!isOverwrite) {
containerData.incrBytesUsed(info.getLen());
}
containerData.incrWriteCount();
containerData.incrWriteBytes(info.getLen());
updateContainerWriteStats(container, info, isOverwrite);
break;
default:
throw new IOException("Can not identify write operation.");
Expand All @@ -176,6 +170,18 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
}
}

protected void updateContainerWriteStats(Container container, ChunkInfo info,
boolean isOverwrite) {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();

if (!isOverwrite) {
containerData.incrBytesUsed(info.getLen());
}
containerData.incrWriteCount();
containerData.incrWriteBytes(info.getLen());
}

/**
* reads the data defined by a chunk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor;
Expand Down Expand Up @@ -251,6 +252,13 @@ public void init(OzoneConfiguration configuration) throws IOException {
@Override
public Void call() throws Exception {
if (ozoneConfiguration != null) {
if (!ozoneConfiguration.getBoolean(
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA,
HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA_DEFAULT)) {
LOG.info("Override validateWrites to false, because "
+ HddsConfigKeys.HDDS_CONTAINER_PERSISTDATA + " is set to false.");
validateWrites = false;
}
init(ozoneConfiguration);
} else {
init(freon.createOzoneConfiguration());
Expand Down Expand Up @@ -282,6 +290,7 @@ public Void call() throws Exception {
LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
LOG.info("Key size: {} bytes", keySize);
LOG.info("Buffer size: {} bytes", bufferSize);
LOG.info("validateWrites : {}", validateWrites);
for (int i = 0; i < numOfThreads; i++) {
executor.submit(new ObjectCreator());
}
Expand Down Expand Up @@ -548,7 +557,7 @@ long getSuccessfulValidationCount() {
*/
@VisibleForTesting
long getUnsuccessfulValidationCount() {
return writeValidationFailureCount;
return validateWrites ? writeValidationFailureCount : 0;
}

/**
Expand Down
Loading