diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index d01dfe44d30c4..59004c6e6e445 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.utils.db.cache.TableCacheImpl; /** * The DBStore interface provides the ability to create Tables, which store @@ -47,7 +48,9 @@ public interface DBStore extends AutoCloseable { /** - * Gets an existing TableStore with implicit key/value conversion. + * Gets an existing TableStore with implicit key/value conversion and + * with default cleanup policy for cache. Default cache clean up policy is + * manual. * * @param name - Name of the TableStore to get * @param keyType @@ -58,6 +61,15 @@ public interface DBStore extends AutoCloseable { Table getTable(String name, Class keyType, Class valueType) throws IOException; + /** + * Gets an existing TableStore with implicit key/value conversion and + * with specified cleanup policy for cache. + * @throws IOException + */ + Table getTable(String name, + Class keyType, Class valueType, + TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException; + /** * Lists the Known list of Tables in a DB. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 27862c7847653..23c03f1884b7c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -38,6 +38,7 @@ import org.apache.hadoop.utils.RocksDBStoreMBean; import com.google.common.base.Preconditions; +import org.apache.hadoop.utils.db.cache.TableCacheImpl; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -260,6 +261,14 @@ public Table getTable(String name, valueType); } + @Override + public Table getTable(String name, + Class keyType, Class valueType, + TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException { + return new TypedTable(getTable(name), codecRegistry, keyType, + valueType, cleanupPolicy); + } + @Override public ArrayList listTables() throws IOException { ArrayList
returnList = new ArrayList<>(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java index 05f73b847276d..e8f9e0a8b306b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java @@ -23,11 +23,16 @@ import java.util.Map; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheResult; import org.apache.hadoop.utils.db.cache.CacheValue; -import org.apache.hadoop.utils.db.cache.PartialTableCache; +import org.apache.hadoop.utils.db.cache.TableCacheImpl; import org.apache.hadoop.utils.db.cache.TableCache; +import org.apache.hadoop.utils.db.cache.TableCacheImpl.CacheCleanupPolicy; +import static org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus.EXISTS; +import static org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus.NOT_EXIST; /** * Strongly typed table implementation. *

@@ -49,16 +54,61 @@ public class TypedTable implements Table { private final TableCache, CacheValue> cache; + private final static long EPOCH_DEFAULT = -1L; + /** + * Create an TypedTable from the raw table. + * Default cleanup policy used for the table is + * {@link CacheCleanupPolicy#MANUAL}. + * @param rawTable + * @param codecRegistry + * @param keyType + * @param valueType + */ + public TypedTable( + Table rawTable, + CodecRegistry codecRegistry, Class keyType, + Class valueType) throws IOException { + this(rawTable, codecRegistry, keyType, valueType, + CacheCleanupPolicy.MANUAL); + } + + /** + * Create an TypedTable from the raw table with specified cleanup policy + * for table cache. + * @param rawTable + * @param codecRegistry + * @param keyType + * @param valueType + * @param cleanupPolicy + */ public TypedTable( Table rawTable, CodecRegistry codecRegistry, Class keyType, - Class valueType) { + Class valueType, + TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException { this.rawTable = rawTable; this.codecRegistry = codecRegistry; this.keyType = keyType; this.valueType = valueType; - cache = new PartialTableCache<>(); + cache = new TableCacheImpl<>(cleanupPolicy); + + if (cleanupPolicy == CacheCleanupPolicy.NEVER) { + //fill cache + try(TableIterator> tableIterator = + iterator()) { + + while (tableIterator.hasNext()) { + KeyValue< KEY, VALUE > kv = tableIterator.next(); + + // We should build cache after OM restart when clean up policy is + // NEVER. Setting epoch value -1, so that when it is marked for + // delete, this will be considered for cleanup. + cache.put(new CacheKey<>(kv.getKey()), + new CacheValue<>(Optional.of(kv.getValue()), EPOCH_DEFAULT)); + } + } + } } @Override @@ -83,9 +133,17 @@ public boolean isEmpty() throws IOException { @Override public boolean isExist(KEY key) throws IOException { - CacheValue cacheValue= cache.get(new CacheKey<>(key)); - return (cacheValue != null && cacheValue.getCacheValue() != null) || - rawTable.isExist(codecRegistry.asRawData(key)); + + CacheResult> cacheResult = + cache.lookup(new CacheKey<>(key)); + + if (cacheResult.getCacheStatus() == EXISTS) { + return true; + } else if (cacheResult.getCacheStatus() == NOT_EXIST) { + return false; + } else { + return rawTable.isExist(codecRegistry.asRawData(key)); + } } /** @@ -104,14 +162,16 @@ public boolean isExist(KEY key) throws IOException { public VALUE get(KEY key) throws IOException { // Here the metadata lock will guarantee that cache is not updated for same // key during get key. - CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key)); - if (cacheValue == null) { - // If no cache for the table or if it does not exist in cache get from - // RocksDB table. - return getFromTable(key); + + CacheResult> cacheResult = + cache.lookup(new CacheKey<>(key)); + + if (cacheResult.getCacheStatus() == EXISTS) { + return cacheResult.getValue().getCacheValue(); + } else if (cacheResult.getCacheStatus() == NOT_EXIST) { + return null; } else { - // We have a value in cache, return the value. - return cacheValue.getCacheValue(); + return getFromTable(key); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheResult.java new file mode 100644 index 0000000000000..76b8381d7bbb3 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheResult.java @@ -0,0 +1,58 @@ +package org.apache.hadoop.utils.db.cache; + +import java.util.Objects; + +/** + * CacheResult which is returned as response for Key exist in cache or not. + * @param + */ +public class CacheResult { + + private CacheStatus cacheStatus; + private CACHEVALUE cachevalue; + + public CacheResult(CacheStatus cacheStatus, CACHEVALUE cachevalue) { + this.cacheStatus = cacheStatus; + this.cachevalue = cachevalue; + } + + public CacheStatus getCacheStatus() { + return cacheStatus; + } + + public CACHEVALUE getValue() { + return cachevalue; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheResult< ? > that = (CacheResult< ? >) o; + return cacheStatus == that.cacheStatus && + Objects.equals(cachevalue, that.cachevalue); + } + + @Override + public int hashCode() { + return Objects.hash(cacheStatus, cachevalue); + } + + /** + * Status which tells whether key exists in cache or not. + */ + public enum CacheStatus { + EXISTS, // When key exists in cache. + + NOT_EXIST, // We guarantee that it does not exist. This will be returned + // when the key does not exist in cache, when cache clean up policy is + // NEVER. + MAY_EXIST // This will be returned when the key does not exist in + // cache, when cache clean up policy is MANUAL. So caller need to check + // if it might exist in it's rocksdb table. + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java index 0536ed7061557..0d191481992f6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java @@ -21,7 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; - +import org.apache.hadoop.utils.db.cache.CacheResult.CacheStatus; +import org.apache.hadoop.utils.db.cache.TableCacheImpl.CacheCleanupPolicy; import java.util.Iterator; import java.util.Map; @@ -52,8 +53,11 @@ public interface TableCache> iterator(); + + /** + * Check key exist in cache or not. + * + * If it exists return CacheResult with value and status as + * {@link CacheStatus#EXISTS} + * + * If it does not exist: + * If cache clean up policy is + * {@link TableCacheImpl.CacheCleanupPolicy#NEVER} it means table cache is + * full cache. It return's {@link CacheResult} with null + * and status as {@link CacheStatus#NOT_EXIST}. + * + * If cache clean up policy is {@link CacheCleanupPolicy#MANUAL} it means + * table cache is partial cache. It return's {@link CacheResult} with + * null and status as MAY_EXIST. + * + * @param cachekey + */ + CacheResult lookup(CACHEKEY cachekey); + } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java similarity index 52% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java index 7d16d04df194a..7a9d55b18a9ee 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCacheImpl.java @@ -32,21 +32,28 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; /** - * Cache implementation for the table, this cache is partial cache, this will - * be cleaned up, after entries are flushed to DB. + * Cache implementation for the table. Depending on the cache clean up policy + * this cache will be full cache or partial cache. + * + * If cache cleanup policy is set as {@link CacheCleanupPolicy#MANUAL}, + * this will be a partial cache. + * + * If cache cleanup policy is set as {@link CacheCleanupPolicy#NEVER}, + * this will be a full cache. */ @Private @Evolving -public class PartialTableCache implements TableCache { private final ConcurrentHashMap cache; private final TreeSet> epochEntries; private ExecutorService executorService; + private CacheCleanupPolicy cleanupPolicy; - public PartialTableCache() { + public TableCacheImpl(CacheCleanupPolicy cleanupPolicy) { cache = new ConcurrentHashMap<>(); epochEntries = new TreeSet<>(); // Created a singleThreadExecutor, so one cleanup will be running at a @@ -54,7 +61,7 @@ public PartialTableCache() { ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("PartialTableCache Cleanup Thread - %d").build(); executorService = Executors.newSingleThreadExecutor(build); - + this.cleanupPolicy = cleanupPolicy; } @Override @@ -70,7 +77,7 @@ public void put(CACHEKEY cacheKey, CACHEVALUE value) { @Override public void cleanup(long epoch) { - executorService.submit(() -> evictCache(epoch)); + executorService.submit(() -> evictCache(epoch, cleanupPolicy)); } @Override @@ -83,16 +90,24 @@ public Iterator> iterator() { return cache.entrySet().iterator(); } - private void evictCache(long epoch) { + private void evictCache(long epoch, CacheCleanupPolicy cacheCleanupPolicy) { EpochEntry currentEntry = null; for (Iterator> iterator = epochEntries.iterator(); iterator.hasNext();) { currentEntry = iterator.next(); CACHEKEY cachekey = currentEntry.getCachekey(); CacheValue cacheValue = cache.computeIfPresent(cachekey, ((k, v) -> { - if (v.getEpoch() <= epoch) { - iterator.remove(); - return null; + if (cleanupPolicy == CacheCleanupPolicy.MANUAL) { + if (v.getEpoch() <= epoch) { + iterator.remove(); + return null; + } + } else if (cleanupPolicy == CacheCleanupPolicy.NEVER) { + // Remove only entries which are marked for delete. + if (v.getEpoch() <= epoch && v.getCacheValue() == null) { + iterator.remove(); + return null; + } } return v; })); @@ -103,4 +118,48 @@ private void evictCache(long epoch) { } } } + + public CacheResult lookup(CACHEKEY cachekey) { + + // TODO: Remove this check once HA and Non-HA code is merged and all + // requests are converted to use cache and double buffer. + // This is to done as temporary instead of passing ratis enabled flag + // which requires more code changes. We cannot use ratis enabled flag + // also because some of the requests in OM HA are not modified to use + // double buffer and cache. + + if (cache.size() == 0) { + return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST, + null); + } + + CACHEVALUE cachevalue = cache.get(cachekey); + if (cachevalue == null) { + if (cleanupPolicy == CacheCleanupPolicy.NEVER) { + return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null); + } else { + return new CacheResult<>(CacheResult.CacheStatus.MAY_EXIST, + null); + } + } else { + if (cachevalue.getCacheValue() != null) { + return new CacheResult<>(CacheResult.CacheStatus.EXISTS, cachevalue); + } else { + // When entity is marked for delete, cacheValue will be set to null. + // In that case we can return NOT_EXIST irrespective of cache cleanup + // policy. + return new CacheResult<>(CacheResult.CacheStatus.NOT_EXIST, null); + } + } + } + + /** + * Cleanup policies for table cache. + */ + public enum CacheCleanupPolicy { + NEVER, // Cache will not be cleaned up. This mean's the table maintains + // full cache. + MANUAL // Cache will be cleaned up, once after flushing to DB. It is + // caller's responsibility to flush to DB, before calling cleanup cache. + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java similarity index 67% rename from hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java rename to hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java index 736ae1b6161b5..c2ae8a1c81c56 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestTableCacheImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.utils.db.cache; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import com.google.common.base.Optional; @@ -26,18 +28,40 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.junit.Assert.fail; /** * Class tests partial table cache. */ -public class TestPartialTableCache { +@RunWith(value = Parameterized.class) +public class TestTableCacheImpl { private TableCache, CacheValue> tableCache; + private final TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy; + + + @Parameterized.Parameters + public static Collection policy() { + Object[][] params = new Object[][] { + {TableCacheImpl.CacheCleanupPolicy.NEVER}, + {TableCacheImpl.CacheCleanupPolicy.MANUAL} + }; + return Arrays.asList(params); + } + + public TestTableCacheImpl( + TableCacheImpl.CacheCleanupPolicy cacheCleanupPolicy) { + this.cacheCleanupPolicy = cacheCleanupPolicy; + } + + @Before public void create() { - tableCache = new PartialTableCache<>(); + tableCache = + new TableCacheImpl<>(cacheCleanupPolicy); } @Test public void testPartialTableCache() { @@ -98,33 +122,40 @@ public void testPartialTableCacheParallel() throws Exception { tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue()); } - int deleted = 5; - // cleanup first 5 entires - tableCache.cleanup(deleted); value = future.get(); Assert.assertEquals(10, value); totalCount += value; - // We should totalCount - deleted entries in cache. - final int tc = totalCount; - GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100, - 5000); - - // Check if we have remaining entries. - for (int i=6; i <= totalCount; i++) { - Assert.assertEquals(Integer.toString(i), - tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue()); + if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.MANUAL) { + int deleted = 5; + + // cleanup first 5 entires + tableCache.cleanup(deleted); + + // We should totalCount - deleted entries in cache. + final int tc = totalCount; + GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100, + 5000); + // Check if we have remaining entries. + for (int i=6; i <= totalCount; i++) { + Assert.assertEquals(Integer.toString(i), tableCache.get( + new CacheKey<>(Integer.toString(i))).getCacheValue()); + } + tableCache.cleanup(10); + + tableCache.cleanup(totalCount); + + // Cleaned up all entries, so cache size should be zero. + GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100, + 5000); + } else { + tableCache.cleanup(totalCount); + Assert.assertEquals(totalCount, tableCache.size()); } - tableCache.cleanup(10); - - tableCache.cleanup(totalCount); - // Cleaned up all entries, so cache size should be zero. - GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100, - 5000); } private int writeToCache(int count, int startVal, long sleep) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 88fd24cf42cb3..c7d6bb422bbf0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -69,6 +69,7 @@ import org.apache.hadoop.utils.db.TypedTable; import org.apache.hadoop.utils.db.cache.CacheKey; import org.apache.hadoop.utils.db.cache.CacheValue; +import org.apache.hadoop.utils.db.cache.TableCacheImpl; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -269,11 +270,13 @@ protected void initializeOmTables() throws IOException { this.store.getTable(USER_TABLE, String.class, VolumeList.class); checkTableStatus(userTable, USER_TABLE); volumeTable = - this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class); + this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class, + TableCacheImpl.CacheCleanupPolicy.NEVER); checkTableStatus(volumeTable, VOLUME_TABLE); bucketTable = - this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class); + this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class, + TableCacheImpl.CacheCleanupPolicy.NEVER); checkTableStatus(bucketTable, BUCKET_TABLE);