Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ default void addCacheEntry(CacheKey<KEY> cacheKey,
throw new NotImplementedException("addCacheEntry is not implemented");
}

/**
* Get the cache value from table cache.
* @param cacheKey
*/
default CacheValue<VALUE> getCacheValue(CacheKey<KEY> cacheKey) {
throw new NotImplementedException("getCacheValue is not implemented");
}

/**
* Removes all the entries from the table cache which are having epoch value
* less
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public void addCacheEntry(CacheKey<KEY> cacheKey,
cache.put(cacheKey, cacheValue);
}

@Override
public CacheValue<VALUE> getCacheValue(CacheKey<KEY> cacheKey) {
return cache.get(cacheKey);
}

public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> cacheIterator() {
return cache.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public enum DummyAction implements AuditAction {

CREATE_VOLUME,
CREATE_BUCKET,
CREATE_KEY,
READ_VOLUME,
READ_BUCKET,
READ_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public enum OMAction implements AuditAction {
COMMIT_KEY,
CREATE_VOLUME,
CREATE_BUCKET,
CREATE_KEY,
DELETE_VOLUME,
DELETE_BUCKET,
DELETE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,28 +165,34 @@ public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
* part of the latest version, not a new version.
*
* @param newLocationList the list of new blocks to be added.
* @param updateTime if true, will update modification time.
* @throws IOException
*/
public synchronized void appendNewBlocks(
List<OmKeyLocationInfo> newLocationList) throws IOException {
List<OmKeyLocationInfo> newLocationList, boolean updateTime)
throws IOException {
if (keyLocationVersions.size() == 0) {
throw new IOException("Appending new block, but no version exist");
}
OmKeyLocationInfoGroup currentLatestVersion =
keyLocationVersions.get(keyLocationVersions.size() - 1);
currentLatestVersion.appendNewBlocks(newLocationList);
setModificationTime(Time.now());
if (updateTime) {
setModificationTime(Time.now());
}
}

/**
* Add a new set of blocks. The new blocks will be added as appending a new
* version to the all version list.
*
* @param newLocationList the list of new blocks to be added.
* @param updateTime - if true, updates modification time.
* @throws IOException
*/
public synchronized long addNewVersion(
List<OmKeyLocationInfo> newLocationList) throws IOException {
List<OmKeyLocationInfo> newLocationList, boolean updateTime)
throws IOException {
long latestVersionNum;
if (keyLocationVersions.size() == 0) {
// no version exist, these blocks are the very first version.
Expand All @@ -202,7 +208,10 @@ public synchronized long addNewVersion(
keyLocationVersions.add(newVersion);
latestVersionNum = newVersion.getVersion();
}
setModificationTime(Time.now());

if (updateTime) {
setModificationTime(Time.now());
}
return latestVersionNum;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.security.token.Token;

import java.util.Objects;

/**
* One key can be too huge to fit in one container. In which case it gets split
* into a number of subkeys. This class represents one such subkey instance.
Expand Down Expand Up @@ -202,4 +204,27 @@ public String toString() {
", pipeline=" + pipeline +
", createVersion=" + createVersion + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OmKeyLocationInfo that = (OmKeyLocationInfo) o;
return length == that.length &&
offset == that.offset &&
createVersion == that.createVersion &&
Objects.equals(blockID, that.blockID) &&
Objects.equals(token, that.token) &&
Objects.equals(pipeline, that.pipeline);
}

@Override
public int hashCode() {
return Objects.hash(blockID, length, offset, token, createVersion,
pipeline);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,20 @@ private OMResponse handleError(OMResponse resp) throws OMException {
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId,
ExcludeList excludeList) throws IOException {
AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
KeyArgs keyArgs = KeyArgs.newBuilder()
KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build();
.setDataSize(args.getDataSize());

if (args.getFactor() != null) {
keyArgs.setFactor(args.getFactor());
}

if (args.getType() != null) {
keyArgs.setType(args.getType());
}

req.setKeyArgs(keyArgs);
req.setClientID(clientId);
req.setExcludeList(excludeList.getProtoBuf());
Expand Down
7 changes: 7 additions & 0 deletions hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ message KeyArgs {
optional uint32 multipartNumber = 10;
repeated hadoop.hdds.KeyValue metadata = 11;
repeated OzoneAclInfo acls = 12;
// This will be set when the request is received in pre-Execute. This
// value is used in setting creation/modification time depending on the
// request type.
optional uint64 modificationTime = 13;
}

message KeyLocation {
Expand Down Expand Up @@ -712,6 +716,9 @@ message ListStatusResponse {

message CreateKeyRequest {
required KeyArgs keyArgs = 1;
// Set in OM HA during preExecute step. This way all OM's use same ID in
// OM HA.
optional uint64 clientID = 2;
}

message CreateKeyResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void restartOzoneManager() throws IOException {
public void stop() {
for (OzoneManager ozoneManager : ozoneManagers) {
if (ozoneManager != null) {
LOG.info("Stopping the OzoneManager " + ozoneManager.getOMNodId());
LOG.info("Stopping the OzoneManager " + ozoneManager.getOMNodeId());
ozoneManager.stop();
ozoneManager.join();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public void testOMProxyProviderInitialization() throws Exception {
}
}
Assert.assertTrue("There is no OM Client Proxy corresponding to OM " +
"node" + cluster.getOzoneManager(i).getOMNodId(),
"node" + cluster.getOzoneManager(i).getOMNodeId(),
omClientProxyExists);
}
}
Expand Down Expand Up @@ -604,7 +604,7 @@ public void testReadRequest() throws Exception {

// Failover to the OM node that the objectStore points to
omFailoverProxyProvider.performFailoverIfRequired(
ozoneManager.getOMNodId());
ozoneManager.getOMNodeId());

// A read request should result in the proxyProvider failing over to
// leader node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,

OmKeyLocationInfo omKeyLocationInfo =
OmKeyLocationInfo.getFromProtobuf(keyLocation);
keyInfo.appendNewBlocks(Collections.singletonList(omKeyLocationInfo));
keyInfo.appendNewBlocks(Collections.singletonList(omKeyLocationInfo), true);
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
return omKeyLocationInfo;
Expand Down Expand Up @@ -317,7 +317,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
// If om is not managing via ratis, write to db, otherwise write to DB
// will happen via ratis apply transaction.
if (!isRatisEnabled) {
keyInfo.appendNewBlocks(locationInfos);
keyInfo.appendNewBlocks(locationInfos, true);
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
}
Expand Down Expand Up @@ -486,7 +486,7 @@ private void allocateBlockInKey(OmKeyInfo keyInfo, long size, long sessionId)
if (size > 0) {
List<OmKeyLocationInfo> locationInfos =
allocateBlock(keyInfo, new ExcludeList(), size);
keyInfo.appendNewBlocks(locationInfos);
keyInfo.appendNewBlocks(locationInfos, true);
}

// When OM is not managed via ratis we should write in to Om db in
Expand All @@ -509,7 +509,7 @@ private OmKeyInfo prepareKeyInfo(
// the key already exist, the new blocks will be added as new version
// when locations.size = 0, the new version will have identical blocks
// as its previous version
keyInfo.addNewVersion(locations);
keyInfo.addNewVersion(locations, true);
keyInfo.setDataSize(size + keyInfo.getDataSize());
}
return keyInfo;
Expand Down Expand Up @@ -632,8 +632,8 @@ public void commitKey(OmKeyArgs args, long clientID) throws IOException {
validateBucket(volumeName, bucketName);
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
if (keyInfo == null) {
throw new OMException("Commit a key without corresponding entry " +
objectKey, KEY_NOT_FOUND);
throw new OMException("Failed to commit key, as " + openKey + "entry " +
"is not found in the openKey table", KEY_NOT_FOUND);
}
keyInfo.setDataSize(args.getDataSize());
keyInfo.setModificationTime(Time.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,26 @@ public boolean isVolumeEmpty(String volume) throws IOException {
try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
bucketIter = bucketTable.iterator()) {
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(volumePrefix);
// During iteration from DB, check in mean time if this bucket is not
// marked for delete.
if (kv != null && kv.getKey().startsWith(volumePrefix) &&
bucketTable.get(kv.getKey()) != null) {
return false; // we found at least one bucket with this volume prefix.

if (kv != null) {
// Check the entry in db is not marked for delete. This can happen
// while entry is marked for delete, but it is not flushed to DB.
CacheValue<OmBucketInfo> cacheValue =
bucketTable.getCacheValue(new CacheKey(kv.getKey()));
if (cacheValue != null) {
if (kv.getKey().startsWith(volumePrefix)
&& cacheValue.getCacheValue() != null) {
return false; // we found at least one bucket with this volume
// prefix.
}
} else {
if (kv.getKey().startsWith(volumePrefix)) {
return false; // we found at least one bucket with this volume
// prefix.
}
}
}

}
return true;
}
Expand All @@ -495,14 +509,43 @@ public boolean isVolumeEmpty(String volume) throws IOException {
public boolean isBucketEmpty(String volume, String bucket)
throws IOException {
String keyPrefix = getBucketKey(volume, bucket);
//TODO: When Key ops are converted in to HA model, use cache also to
// determine bucket is empty or not.

// First check in key table cache.
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator =
((TypedTable< String, OmKeyInfo>) keyTable).cacheIterator();
while (iterator.hasNext()) {
Map.Entry< CacheKey<String>, CacheValue<OmKeyInfo>> entry =
iterator.next();
String key = entry.getKey().getCacheKey();
OmKeyInfo omKeyInfo = entry.getValue().getCacheValue();
// Making sure that entry is not for delete key request.
if (key.startsWith(keyPrefix) && omKeyInfo != null) {
return false;
}
}
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
keyTable.iterator()) {
KeyValue<String, OmKeyInfo> kv = keyIter.seek(keyPrefix);
if (kv != null && kv.getKey().startsWith(keyPrefix)) {
return false; // we found at least one key with this vol/bucket prefix.

if (kv != null) {
// Check the entry in db is not marked for delete. This can happen
// while entry is marked for delete, but it is not flushed to DB.
CacheValue<OmKeyInfo> cacheValue =
keyTable.getCacheValue(new CacheKey(kv.getKey()));
if (cacheValue != null) {
if (kv.getKey().startsWith(keyPrefix)
&& cacheValue.getCacheValue() != null) {
return false; // we found at least one key with this vol/bucket
// prefix.
}
} else {
if (kv.getKey().startsWith(keyPrefix)) {
return false; // we found at least one key with this vol/bucket
// prefix.
}
}
}

}
return true;
}
Expand Down
Loading