Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.utils.db.cache.TableCacheImpl;

/**
* The DBStore interface provides the ability to create Tables, which store
Expand All @@ -47,7 +48,9 @@ public interface DBStore extends AutoCloseable {


/**
* Gets an existing TableStore with implicit key/value conversion.
* Gets an existing TableStore with implicit key/value conversion and
* with default cleanup policy for cache. Default cache clean up policy is
* manual.
*
* @param name - Name of the TableStore to get
* @param keyType
Expand All @@ -58,6 +61,15 @@ public interface DBStore extends AutoCloseable {
<KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType) throws IOException;

/**
* Gets an existing TableStore with implicit key/value conversion and
* with specified cleanup policy for cache.
* @throws IOException
*/
<KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType,
TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException;

/**
* Lists the Known list of Tables in a DB.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.utils.RocksDBStoreMBean;

import com.google.common.base.Preconditions;
import org.apache.hadoop.utils.db.cache.TableCacheImpl;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -260,6 +261,14 @@ public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
valueType);
}

@Override
public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType,
TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException {
return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
valueType, cleanupPolicy);
}

@Override
public ArrayList<Table> listTables() throws IOException {
ArrayList<Table> returnList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.hadoop.utils.db.cache.CacheKey;
import org.apache.hadoop.utils.db.cache.CacheResult;
import org.apache.hadoop.utils.db.cache.CacheValue;
import org.apache.hadoop.utils.db.cache.PartialTableCache;
import org.apache.hadoop.utils.db.cache.TableCacheImpl;
import org.apache.hadoop.utils.db.cache.TableCache;
import org.apache.hadoop.utils.db.cache.TableCacheImpl.CacheCleanupPolicy;

import static org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus.EXISTS;
import static org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus.NOT_EXIST;
/**
* Strongly typed table implementation.
* <p>
Expand All @@ -49,16 +54,61 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {

private final TableCache<CacheKey<KEY>, CacheValue<VALUE>> cache;

private final static long EPOCH_DEFAULT = -1L;

/**
* Create an TypedTable from the raw table.
* Default cleanup policy used for the table is
* {@link CacheCleanupPolicy#MANUAL}.
* @param rawTable
* @param codecRegistry
* @param keyType
* @param valueType
*/
public TypedTable(
Table<byte[], byte[]> rawTable,
CodecRegistry codecRegistry, Class<KEY> keyType,
Class<VALUE> valueType) throws IOException {
this(rawTable, codecRegistry, keyType, valueType,
CacheCleanupPolicy.MANUAL);
}

/**
* Create an TypedTable from the raw table with specified cleanup policy
* for table cache.
* @param rawTable
* @param codecRegistry
* @param keyType
* @param valueType
* @param cleanupPolicy
*/
public TypedTable(
Table<byte[], byte[]> rawTable,
CodecRegistry codecRegistry, Class<KEY> keyType,
Class<VALUE> valueType) {
Class<VALUE> valueType,
TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException {
this.rawTable = rawTable;
this.codecRegistry = codecRegistry;
this.keyType = keyType;
this.valueType = valueType;
cache = new PartialTableCache<>();
cache = new TableCacheImpl<>(cleanupPolicy);

if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
//fill cache
try(TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> tableIterator =
iterator()) {

while (tableIterator.hasNext()) {
KeyValue< KEY, VALUE > kv = tableIterator.next();

// We should build cache after OM restart when clean up policy is
// NEVER. Setting epoch value -1, so that when it is marked for
// delete, this will be considered for cleanup.
cache.put(new CacheKey<>(kv.getKey()),
new CacheValue<>(Optional.of(kv.getValue()), EPOCH_DEFAULT));
}
}
}
}

@Override
Expand All @@ -83,9 +133,17 @@ public boolean isEmpty() throws IOException {

@Override
public boolean isExist(KEY key) throws IOException {
CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
return (cacheValue != null && cacheValue.getCacheValue() != null) ||
rawTable.isExist(codecRegistry.asRawData(key));

CacheResult<CacheValue<VALUE>> cacheResult =
cache.lookup(new CacheKey<>(key));

if (cacheResult.getCacheStatus() == EXISTS) {
return true;
} else if (cacheResult.getCacheStatus() == NOT_EXIST) {
return false;
} else {
return rawTable.isExist(codecRegistry.asRawData(key));
}
}

/**
Expand All @@ -104,14 +162,16 @@ public boolean isExist(KEY key) throws IOException {
public VALUE get(KEY key) throws IOException {
// Here the metadata lock will guarantee that cache is not updated for same
// key during get key.
CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key));
if (cacheValue == null) {
// If no cache for the table or if it does not exist in cache get from
// RocksDB table.
return getFromTable(key);

CacheResult<CacheValue<VALUE>> cacheResult =
cache.lookup(new CacheKey<>(key));

if (cacheResult.getCacheStatus() == EXISTS) {
return cacheResult.getValue().getCacheValue();
} else if (cacheResult.getCacheStatus() == NOT_EXIST) {
return null;
} else {
// We have a value in cache, return the value.
return cacheValue.getCacheValue();
return getFromTable(key);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.hadoop.utils.db.cache;

import java.util.Objects;

/**
* CacheResult which is returned as response for Key exist in cache or not.
* @param <CACHEVALUE>
*/
public class CacheResult<CACHEVALUE extends CacheValue> {

private CacheStatus cacheStatus;
private CACHEVALUE cachevalue;

public CacheResult(CacheStatus cacheStatus, CACHEVALUE cachevalue) {
this.cacheStatus = cacheStatus;
this.cachevalue = cachevalue;
}

public CacheStatus getCacheStatus() {
return cacheStatus;
}

public CACHEVALUE getValue() {
return cachevalue;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CacheResult< ? > that = (CacheResult< ? >) o;
return cacheStatus == that.cacheStatus &&
Objects.equals(cachevalue, that.cachevalue);
}

@Override
public int hashCode() {
return Objects.hash(cacheStatus, cachevalue);
}

/**
* Status which tells whether key exists in cache or not.
*/
public enum CacheStatus {
EXISTS, // When key exists in cache.

NOT_EXIST, // We guarantee that it does not exist. This will be returned
// when the key does not exist in cache, when cache clean up policy is
// NEVER.
MAY_EXIST // This will be returned when the key does not exist in
// cache, when cache clean up policy is MANUAL. So caller need to check
// if it might exist in it's rocksdb table.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;

import org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus;
import org.apache.hadoop.utils.db.cache.TableCacheImpl.CacheCleanupPolicy;
import java.util.Iterator;
import java.util.Map;

Expand Down Expand Up @@ -52,8 +53,11 @@ public interface TableCache<CACHEKEY extends CacheKey,

/**
* Removes all the entries from the cache which are having epoch value less
* than or equal to specified epoch value. For FullTable Cache this is a
* do-nothing operation.
* than or equal to specified epoch value.
*
* If clean up policy is NEVER, this is a do nothing operation.
* If clean up policy is MANUAL, it is caller responsibility to cleanup the
* cache before calling cleanup.
* @param epoch
*/
void cleanup(long epoch);
Expand All @@ -69,4 +73,25 @@ public interface TableCache<CACHEKEY extends CacheKey,
* @return iterator of the underlying cache for the table.
*/
Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator();

/**
* Check key exist in cache or not.
*
* If it exists return CacheResult with value and status as
* {@link CacheStatus#EXISTS}
*
* If it does not exist:
* If cache clean up policy is
* {@link TableCacheImpl.CacheCleanupPolicy#NEVER} it means table cache is
* full cache. It return's {@link CacheResult} with null
* and status as {@link CacheStatus#NOT_EXIST}.
*
* If cache clean up policy is {@link CacheCleanupPolicy#MANUAL} it means
* table cache is partial cache. It return's {@link CacheResult} with
* null and status as MAY_EXIST.
*
* @param cachekey
*/
CacheResult<CACHEVALUE> lookup(CACHEKEY cachekey);

}
Loading