diff --git a/.github/workflows/build-cachelib-centos-8-5.yml b/.github/workflows/build-cachelib-centos-8-5.yml
index 5dade56439..fcb3129b22 100644
--- a/.github/workflows/build-cachelib-centos-8-5.yml
+++ b/.github/workflows/build-cachelib-centos-8-5.yml
@@ -13,11 +13,6 @@
 # limitations under the License.
 name: build-cachelib-centos-8.5
 on:
-  push:
-    tags:
-      - 'v*'
-  pull_request:
-  workflow_dispatch:
   schedule:
      - cron:  '0 9 * * *'
 jobs:
diff --git a/.github/workflows/build-cachelib-centos-long.yml b/.github/workflows/build-cachelib-centos-long.yml
new file mode 100644
index 0000000000..92165f603b
--- /dev/null
+++ b/.github/workflows/build-cachelib-centos-long.yml
@@ -0,0 +1,39 @@
+name: build-cachelib-centos-latest
+on:
+  schedule:
+    - cron:  '0 7 * * *'
+    
+jobs:
+  build-cachelib-centos8-latest:
+    name: "CentOS/latest - Build CacheLib with all dependencies"
+    runs-on: ubuntu-latest
+    # Docker container image name
+    container: "centos:latest"
+    steps:
+      - name: "update packages"
+        run: dnf upgrade -y
+      - name: "install sudo,git"
+        run: dnf install -y sudo git cmake gcc
+      - name: "System Information"
+        run: |
+          echo === uname ===
+          uname -a
+          echo === /etc/os-release ===
+          cat /etc/os-release
+          echo === df -hl ===
+          df -hl
+          echo === free -h ===
+          free -h
+          echo === top ===
+          top -b -n1 -1 -Eg || timeout 1 top -b -n1
+          echo === env ===
+          env
+          echo === gcc -v ===
+          gcc -v
+      - name: "checkout sources"
+        uses: actions/checkout@v2
+      - name: "build CacheLib using build script"
+        run: ./contrib/build.sh -j -v -T
+      - name: "run tests"
+        timeout-minutes: 60
+        run: cd opt/cachelib/tests && ../../../run_tests.sh long
diff --git a/.github/workflows/build-cachelib-debian.yml b/.github/workflows/build-cachelib-debian.yml
new file mode 100644
index 0000000000..5bc3ad3c70
--- /dev/null
+++ b/.github/workflows/build-cachelib-debian.yml
@@ -0,0 +1,43 @@
+name: build-cachelib-debian-10
+on:
+  schedule:
+    - cron:  '30 5 * * 0,3'
+
+jobs:
+  build-cachelib-debian-10:
+    name: "Debian/Buster - Build CacheLib with all dependencies"
+    runs-on: ubuntu-latest
+    # Docker container image name
+    container: "debian:buster-slim"
+    steps:
+      - name: "update packages"
+        run: apt-get update
+      - name: "upgrade packages"
+        run: apt-get -y upgrade
+      - name: "install sudo,git"
+        run: apt-get install -y sudo git procps
+      - name: "System Information"
+        run: |
+          echo === uname ===
+          uname -a
+          echo === /etc/os-release ===
+          cat /etc/os-release
+          echo === df -hl ===
+          df -hl
+          echo === free -h ===
+          free -h
+          echo === top ===
+          top -b -n1 -1 -Eg || timeout 1 top -b -n1 ; true
+          echo === env ===
+          env
+          echo === cc -v ===
+          cc -v || true
+          echo === g++ -v ===
+          g++ - || true
+      - name: "checkout sources"
+        uses: actions/checkout@v2
+      - name: "build CacheLib using build script"
+        run: ./contrib/build.sh -j -v -T
+      - name: "run tests"
+        timeout-minutes: 60
+        run: cd opt/cachelib/tests && ../../../run_tests.sh
diff --git a/.github/workflows/build-cachelib-docker.yml b/.github/workflows/build-cachelib-docker.yml
new file mode 100644
index 0000000000..f00c028708
--- /dev/null
+++ b/.github/workflows/build-cachelib-docker.yml
@@ -0,0 +1,50 @@
+name: build-cachelib-docker
+on:
+  push:
+  pull_request:
+
+jobs:
+  build-cachelib-docker:
+    name: "CentOS/latest - Build CacheLib with all dependencies"
+    runs-on: ubuntu-latest
+    env:
+      REPO:           cachelib
+      GITHUB_REPO:    intel/CacheLib
+      CONTAINER_REG:  ghcr.io/pmem/cachelib
+      CONTAINER_REG_USER:   ${{ secrets.GH_CR_USER }}
+      CONTAINER_REG_PASS:   ${{ secrets.GH_CR_PAT }}
+      FORCE_IMAGE_ACTION:   ${{ secrets.FORCE_IMAGE_ACTION }}
+      HOST_WORKDIR:         ${{ github.workspace }}
+      WORKDIR:              docker
+      IMG_VER:              devel
+    strategy:
+      matrix:
+        CONFIG: ["OS=centos OS_VER=8streams PUSH_IMAGE=1"]
+    steps:
+      - name: "System Information"
+        run: |
+          echo === uname ===
+          uname -a
+          echo === /etc/os-release ===
+          cat /etc/os-release
+          echo === df -hl ===
+          df -hl
+          echo === free -h ===
+          free -h
+          echo === top ===
+          top -b -n1 -1 -Eg || timeout 1 top -b -n1
+          echo === env ===
+          env
+          echo === gcc -v ===
+          gcc -v
+      - name: "checkout sources"
+        uses: actions/checkout@v2
+        with:
+          submodules: recursive
+          fetch-depth: 0
+
+      - name: Pull the image or rebuild and push it
+        run: cd $WORKDIR && ${{ matrix.CONFIG }} ./pull-or-rebuild-image.sh $FORCE_IMAGE_ACTION
+
+      - name: Run the build
+        run: cd $WORKDIR && ${{ matrix.CONFIG }} ./build.sh
diff --git a/MultiTierDataMovement.md b/MultiTierDataMovement.md
new file mode 100644
index 0000000000..cccc14b947
--- /dev/null
+++ b/MultiTierDataMovement.md
@@ -0,0 +1,90 @@
+# Background Data Movement
+
+In order to reduce the number of online evictions and support asynchronous
+promotion - we have added two periodic workers to handle eviction and promotion.
+
+The diagram below shows a simplified version of how the background evictor
+thread (green) is integrated to the CacheLib architecture. 
+
+
+   +
+
+
+## Background Evictors
+
+The background evictors scan each class to see if there are objects to move the next (lower)
+tier using a given strategy. Here we document the parameters for the different
+strategies and general parameters. 
+
+- `backgroundEvictorIntervalMilSec`: The interval that this thread runs for - by default
+the background evictor threads will wake up every 10 ms to scan the AllocationClasses. Also,
+the background evictor thread will be woken up everytime there is a failed allocation (from
+a request handling thread) and the current percentage of free memory for the 
+AllocationClass is lower than `lowEvictionAcWatermark`. This may render the interval parameter
+not as important when there are many allocations occuring from request handling threads. 
+
+- `evictorThreads`: The number of background evictors to run - each thread is a assigned
+a set of AllocationClasses to scan and evict objects from. Currently, each thread gets
+an equal number of classes to scan - but as object size distribution may be unequal - future
+versions will attempt to balance the classes among threads. The range is 1 to number of AllocationClasses.
+The default is 1. 
+
+- `maxEvictionBatch`: The number of objects to remove in a given eviction call. The
+default is 40. Lower range is 10 and the upper range is 1000. Too low and we might not
+remove objects at a reasonable rate, too high and it might increase contention with user threads.
+
+- `minEvictionBatch`: Minimum number of items to evict at any time (if there are any
+candidates)
+
+- `maxEvictionPromotionHotness`: Maximum candidates to consider for eviction. This is similar to `maxEvictionBatch`
+but it specifies how many candidates will be taken into consideration, not the actual number of items to evict.
+This option can be used to configure duration of critical section on LRU lock.
+
+
+### FreeThresholdStrategy (default)
+
+- `lowEvictionAcWatermark`: Triggers background eviction thread to run
+when this percentage of the AllocationClass is free. 
+The default is `2.0`, to avoid wasting capacity we don't set this above `10.0`.
+
+- `highEvictionAcWatermark`: Stop the evictions from an AllocationClass when this 
+percentage of the AllocationClass is free. The default is `5.0`, to avoid wasting capacity we
+don't set this above `10`.
+
+
+## Background Promoters
+
+The background promoters scan each class to see if there are objects to move to a lower
+tier using a given strategy. Here we document the parameters for the different
+strategies and general parameters.
+
+- `backgroundPromoterIntervalMilSec`: The interval that this thread runs for - by default
+the background promoter threads will wake up every 10 ms to scan the AllocationClasses for
+objects to promote.
+
+- `promoterThreads`: The number of background promoters to run - each thread is a assigned
+a set of AllocationClasses to scan and promote objects from. Currently, each thread gets
+an equal number of classes to scan - but as object size distribution may be unequal - future
+versions will attempt to balance the classes among threads. The range is `1` to number of AllocationClasses. The default is `1`.
+
+- `maxProtmotionBatch`: The number of objects to promote in a given promotion call. The
+default is 40. Lower range is 10 and the upper range is 1000. Too low and we might not
+remove objects at a reasonable rate, too high and it might increase contention with user threads. 
+
+- `minPromotionBatch`: Minimum number of items to promote at any time (if there are any
+candidates)
+
+- `numDuplicateElements`: This allows us to promote items that have existing handles (read-only) since
+we won't need to modify the data when a user is done with the data. Therefore, for a short time
+the data could reside in both tiers until it is evicted from its current tier. The default is to
+not allow this (0). Setting the value to 100 will enable duplicate elements in tiers.
+
+### Background Promotion Strategy (only one currently)
+
+- `promotionAcWatermark`: Promote items if there is at least this
+percent of free AllocationClasses. Promotion thread will attempt to move `maxPromotionBatch` number of objects
+to that tier. The objects are chosen from the head of the LRU. The default is `4.0`.
+This value should correlate with `lowEvictionAcWatermark`, `highEvictionAcWatermark`, `minAcAllocationWatermark`, `maxAcAllocationWatermark`.
+- `maxPromotionBatch`: The number of objects to promote in batch during BG promotion. Analogous to
+`maxEvictionBatch`. It's value should be lower to decrease contention on hot items.
+
diff --git a/cachelib/CMakeLists.txt b/cachelib/CMakeLists.txt
index 506ba66bcf..bb77d54dc6 100644
--- a/cachelib/CMakeLists.txt
+++ b/cachelib/CMakeLists.txt
@@ -43,6 +43,7 @@ set(PACKAGE_BUGREPORT "https://github.com/facebook/TBD")
 set(CMAKE_POSITION_INDEPENDENT_CODE ON)
 
 option(BUILD_TESTS "If enabled, compile the tests." ON)
+option(BUILD_WITH_DTO "If enabled, build with DSA transparent offloading." OFF)
 
 
 set(BIN_INSTALL_DIR bin CACHE STRING
@@ -85,6 +86,11 @@ set(CMAKE_MODULE_PATH
 set(CMAKE_CXX_STANDARD 17)
 set(CMAKE_CXX_STANDARD_REQUIRED True)
 
+if(COVERAGE_ENABLED)
+  # Add code coverage
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --coverage -fprofile-arcs -ftest-coverage")
+endif()
+
 # include(fb_cxx_flags)
 message(STATUS "Update CXXFLAGS: ${CMAKE_CXX_FLAGS}")
 
diff --git a/cachelib/allocator/BackgroundMover.h b/cachelib/allocator/BackgroundMover.h
index aee86a4e32..e8c1242283 100644
--- a/cachelib/allocator/BackgroundMover.h
+++ b/cachelib/allocator/BackgroundMover.h
@@ -18,7 +18,6 @@
 
 #include "cachelib/allocator/BackgroundMoverStrategy.h"
 #include "cachelib/allocator/CacheStats.h"
-#include "cachelib/common/AtomicCounter.h"
 #include "cachelib/common/PeriodicWorker.h"
 
 namespace facebook::cachelib {
@@ -27,17 +26,19 @@ namespace facebook::cachelib {
 template 
 struct BackgroundMoverAPIWrapper {
   static size_t traverseAndEvictItems(C& cache,
+                                      unsigned int tid,
                                       unsigned int pid,
                                       unsigned int cid,
                                       size_t batch) {
-    return cache.traverseAndEvictItems(pid, cid, batch);
+    return cache.traverseAndEvictItems(tid, pid, cid, batch);
   }
 
   static size_t traverseAndPromoteItems(C& cache,
+                                        unsigned int tid,
                                         unsigned int pid,
                                         unsigned int cid,
                                         size_t batch) {
-    return cache.traverseAndPromoteItems(pid, cid, batch);
+    return cache.traverseAndPromoteItems(tid, pid, cid, batch);
   }
 };
 
@@ -49,6 +50,7 @@ enum class MoverDir { Evict = 0, Promote };
 template 
 class BackgroundMover : public PeriodicWorker {
  public:
+  using ClassBgStatsType = std::map;
   using Cache = CacheT;
   // @param cache               the cache interface
   // @param strategy            the stragey class that defines how objects are
@@ -60,16 +62,38 @@ class BackgroundMover : public PeriodicWorker {
   ~BackgroundMover() override;
 
   BackgroundMoverStats getStats() const noexcept;
-  std::map> getClassStats() const noexcept;
+  ClassBgStatsType getClassStats() const noexcept {
+    return movesPerClass_;
+  }
 
   void setAssignedMemory(std::vector&& assignedMemory);
 
   // return id of the worker responsible for promoting/evicting from particlar
   // pool and allocation calss (id is in range [0, numWorkers))
-  static size_t workerId(PoolId pid, ClassId cid, size_t numWorkers);
+  static size_t workerId(TierId tid, PoolId pid, ClassId cid, size_t numWorkers);
 
  private:
-  std::map> movesPerClass_;
+  ClassBgStatsType movesPerClass_;
+
+  struct TraversalStats {
+    // record a traversal and its time taken
+    void recordTraversalTime(uint64_t nsTaken);
+
+    uint64_t getAvgTraversalTimeNs(uint64_t numTraversals) const;
+    uint64_t getMinTraversalTimeNs() const { return minTraversalTimeNs_; }
+    uint64_t getMaxTraversalTimeNs() const { return maxTraversalTimeNs_; }
+    uint64_t getLastTraversalTimeNs() const { return lastTraversalTimeNs_; }
+
+   private:
+    // time it took us the last time to traverse the cache.
+    uint64_t lastTraversalTimeNs_{0};
+    uint64_t minTraversalTimeNs_{
+        std::numeric_limits::max()};
+    uint64_t maxTraversalTimeNs_{0};
+    uint64_t totalTraversalTimeNs_{0};
+  };
+
+  TraversalStats traversalStats_;
   // cache allocator's interface for evicting
   using Item = typename Cache::Item;
 
@@ -77,15 +101,18 @@ class BackgroundMover : public PeriodicWorker {
   std::shared_ptr strategy_;
   MoverDir direction_;
 
-  std::function moverFunc;
+  std::function
+      moverFunc;
 
   // implements the actual logic of running the background evictor
   void work() override final;
   void checkAndRun();
 
-  AtomicCounter numMovedItems_{0};
-  AtomicCounter numTraversals_{0};
-  AtomicCounter totalBytesMoved_{0};
+  uint64_t numMovedItems{0};
+  uint64_t numTraversals{0};
+  uint64_t totalClasses{0};
+  uint64_t totalBytesMoved{0};
 
   std::vector assignedMemory_;
   folly::DistributedMutex mutex_;
@@ -105,6 +132,20 @@ BackgroundMover::BackgroundMover(
   }
 }
 
+template 
+void BackgroundMover::TraversalStats::recordTraversalTime(uint64_t nsTaken) {
+  lastTraversalTimeNs_ = nsTaken;
+  minTraversalTimeNs_ = std::min(minTraversalTimeNs_, nsTaken);
+  maxTraversalTimeNs_ = std::max(maxTraversalTimeNs_, nsTaken);
+  totalTraversalTimeNs_ += nsTaken;
+}
+
+template 
+uint64_t BackgroundMover::TraversalStats::getAvgTraversalTimeNs(
+    uint64_t numTraversals) const {
+  return numTraversals ? totalTraversalTimeNs_ / numTraversals : 0;
+}
+
 template 
 BackgroundMover::~BackgroundMover() {
   stop(std::chrono::seconds(0));
@@ -123,8 +164,8 @@ template 
 void BackgroundMover::setAssignedMemory(
     std::vector&& assignedMemory) {
   XLOG(INFO, "Class assigned to background worker:");
-  for (auto [pid, cid] : assignedMemory) {
-    XLOGF(INFO, "Pid: {}, Cid: {}", pid, cid);
+  for (auto [tid, pid, cid] : assignedMemory) {
+    XLOGF(INFO, "Tid: {}, Pid: {}, Cid: {}", tid, pid, cid);
   }
 
   mutex_.lock_combine([this, &assignedMemory] {
@@ -138,51 +179,64 @@ template 
 void BackgroundMover::checkAndRun() {
   auto assignedMemory = mutex_.lock_combine([this] { return assignedMemory_; });
 
-  unsigned int moves = 0;
-  auto batches = strategy_->calculateBatchSizes(cache_, assignedMemory);
-
-  for (size_t i = 0; i < batches.size(); i++) {
-    const auto [pid, cid] = assignedMemory[i];
-    const auto batch = batches[i];
-
-    if (batch == 0) {
-      continue;
+  while (true) {
+    unsigned int moves = 0;
+    std::set classes{};
+    auto batches = strategy_->calculateBatchSizes(cache_, assignedMemory);
+
+    const auto begin = util::getCurrentTimeNs();
+    for (size_t i = 0; i < batches.size(); i++) {
+      const auto [tid, pid, cid] = assignedMemory[i];
+      const auto batch = batches[i];
+      if (!batch) {
+        continue;
+      }
+
+      // try moving BATCH items from the class in order to reach free target
+      auto moved = moverFunc(cache_, tid, pid, cid, batch);
+      moves += moved;
+      movesPerClass_[assignedMemory[i]] += moved;
+    }
+    auto end = util::getCurrentTimeNs();
+    if (moves > 0) {
+      traversalStats_.recordTraversalTime(end > begin ? end - begin : 0);
+      numMovedItems += moves;
+      numTraversals++;
     }
 
-    // try moving BATCH items from the class in order to reach free target
-    auto moved = moverFunc(cache_, pid, cid, batch);
-    moves += moved;
-    movesPerClass_[pid][cid] += moved;
-    totalBytesMoved_.add(moved * cache_.getPool(pid).getAllocSizes()[cid]);
+    //we didn't move any objects done with this run
+    if (moves == 0 || shouldStopWork()) {
+        break;
+    }
   }
-
-  numTraversals_.inc();
-  numMovedItems_.add(moves);
 }
 
 template 
 BackgroundMoverStats BackgroundMover::getStats() const noexcept {
   BackgroundMoverStats stats;
-  stats.numMovedItems = numMovedItems_.get();
-  stats.runCount = numTraversals_.get();
-  stats.totalBytesMoved = totalBytesMoved_.get();
+  stats.numMovedItems = numMovedItems;
+  stats.totalBytesMoved = totalBytesMoved;
+  stats.totalClasses = totalClasses;
+  auto runCount = getRunCount();
+  stats.runCount = runCount;
+  stats.numTraversals = numTraversals;
+  stats.avgItemsMoved = (double) stats.numMovedItems / (double)runCount;
+  stats.lastTraversalTimeNs = traversalStats_.getLastTraversalTimeNs();
+  stats.avgTraversalTimeNs = traversalStats_.getAvgTraversalTimeNs(numTraversals);
+  stats.minTraversalTimeNs = traversalStats_.getMinTraversalTimeNs();
+  stats.maxTraversalTimeNs = traversalStats_.getMaxTraversalTimeNs();
 
   return stats;
 }
 
 template 
-std::map>
-BackgroundMover::getClassStats() const noexcept {
-  return movesPerClass_;
-}
-
-template 
-size_t BackgroundMover::workerId(PoolId pid,
+size_t BackgroundMover::workerId(TierId tid,
+                                         PoolId pid,
                                          ClassId cid,
                                          size_t numWorkers) {
   XDCHECK(numWorkers);
 
   // TODO: came up with some better sharding (use hashing?)
-  return (pid + cid) % numWorkers;
+  return (tid + pid + cid) % numWorkers;
 }
 } // namespace facebook::cachelib
diff --git a/cachelib/allocator/BackgroundMoverStrategy.h b/cachelib/allocator/BackgroundMoverStrategy.h
index abf37edd13..2f187636c6 100644
--- a/cachelib/allocator/BackgroundMoverStrategy.h
+++ b/cachelib/allocator/BackgroundMoverStrategy.h
@@ -21,12 +21,6 @@
 namespace facebook {
 namespace cachelib {
 
-struct MemoryDescriptorType {
-  MemoryDescriptorType(PoolId pid, ClassId cid) : pid_(pid), cid_(cid) {}
-  PoolId pid_;
-  ClassId cid_;
-};
-
 // Base class for background eviction strategy.
 class BackgroundMoverStrategy {
  public:
@@ -44,5 +38,34 @@ class BackgroundMoverStrategy {
   virtual ~BackgroundMoverStrategy() = default;
 };
 
+class DefaultBackgroundMoverStrategy : public BackgroundMoverStrategy {
+  public:
+    DefaultBackgroundMoverStrategy(uint64_t batchSize, double targetFree)
+      : batchSize_(batchSize), targetFree_((double)targetFree/100.0) {}
+    ~DefaultBackgroundMoverStrategy() {}
+
+  std::vector calculateBatchSizes(
+      const CacheBase& cache,
+      std::vector acVec) {
+    std::vector batches{};
+    for (auto [tid, pid, cid] : acVec) {
+        double usage = cache.getPoolByTid(pid, tid).getApproxUsage(cid);
+        uint32_t perSlab = cache.getPoolByTid(pid, tid).getPerSlab(cid);
+        if (usage >= (1.0-targetFree_)) {
+          uint32_t batch = batchSize_ > perSlab ? perSlab : batchSize_;
+          batches.push_back(batch);
+        } else {
+          //no work to be done since there is already
+          //at least targetFree remaining in the class
+          batches.push_back(0);
+        }
+    }
+    return batches;
+  }
+  private:
+    uint64_t batchSize_{100};
+    double targetFree_{0.05};
+};
+
 } // namespace cachelib
 } // namespace facebook
diff --git a/cachelib/allocator/CMakeLists.txt b/cachelib/allocator/CMakeLists.txt
index 6103cdc823..0f96a0cd7f 100644
--- a/cachelib/allocator/CMakeLists.txt
+++ b/cachelib/allocator/CMakeLists.txt
@@ -55,6 +55,7 @@ add_library (cachelib_allocator
     PoolOptimizeStrategy.cpp
     PoolRebalancer.cpp
     PoolResizer.cpp
+    PrivateMemoryManager.cpp
     RebalanceStrategy.cpp
     SlabReleaseStats.cpp
     TempShmMapping.cpp
diff --git a/cachelib/allocator/Cache.cpp b/cachelib/allocator/Cache.cpp
index 37bba99a67..8d958b3510 100644
--- a/cachelib/allocator/Cache.cpp
+++ b/cachelib/allocator/Cache.cpp
@@ -244,6 +244,7 @@ void CacheBase::updateGlobalCacheStats(const std::string& statPrefix) const {
       statPrefix + "cache.size.configured",
       memStats.configuredRamCacheSize + memStats.nvmCacheSize);
 
+  //TODO: add specific per-tier counters
   const auto stats = getGlobalCacheStats();
 
   // Eviction Stats
@@ -253,7 +254,8 @@ void CacheBase::updateGlobalCacheStats(const std::string& statPrefix) const {
   //   from both ram and nvm, this is counted as a single eviction from cache.
   // Ram Evictions: item evicted from ram but it can be inserted into nvm
   const std::string ramEvictionKey = statPrefix + "ram.evictions";
-  counters_.updateDelta(ramEvictionKey, stats.numEvictions);
+  counters_.updateDelta(ramEvictionKey,
+                        std::accumulate(stats.numEvictions.begin(), stats.numEvictions.end(), 0));
   // Nvm Evictions: item evicted from nvm but it can be still in ram
   const std::string nvmEvictionKey = statPrefix + "nvm.evictions";
   counters_.updateDelta(nvmEvictionKey, stats.numNvmEvictions);
@@ -295,11 +297,11 @@ void CacheBase::updateGlobalCacheStats(const std::string& statPrefix) const {
   }
 
   counters_.updateDelta(statPrefix + "cache.alloc_attempts",
-                        stats.allocAttempts);
+                        std::accumulate(stats.allocAttempts.begin(), stats.allocAttempts.end(),0));
   counters_.updateDelta(statPrefix + "cache.eviction_attempts",
-                        stats.evictionAttempts);
+                        std::accumulate(stats.evictionAttempts.begin(),stats.evictionAttempts.end(),0));
   counters_.updateDelta(statPrefix + "cache.alloc_failures",
-                        stats.allocFailures);
+                        std::accumulate(stats.allocFailures.begin(),stats.allocFailures.end(),0));
   counters_.updateDelta(statPrefix + "cache.invalid_allocs",
                         stats.invalidAllocs);
 
@@ -475,6 +477,10 @@ void CacheBase::updateGlobalCacheStats(const std::string& statPrefix) const {
 
   visitEstimates(uploadStatsNanoToMicro, stats.allocateLatencyNs,
                  statPrefix + "allocate.latency_us");
+  visitEstimates(uploadStatsNanoToMicro, stats.bgEvictLatencyNs,
+                 statPrefix + "background.eviction.latency_us");
+  visitEstimates(uploadStatsNanoToMicro, stats.bgPromoteLatencyNs,
+                 statPrefix + "background.promotion.latency_us");
   visitEstimates(uploadStatsNanoToMicro, stats.moveChainedLatencyNs,
                  statPrefix + "move.chained.latency_us");
   visitEstimates(uploadStatsNanoToMicro, stats.moveRegularLatencyNs,
diff --git a/cachelib/allocator/Cache.h b/cachelib/allocator/Cache.h
index e225ba8a01..6f7ae20bc5 100644
--- a/cachelib/allocator/Cache.h
+++ b/cachelib/allocator/Cache.h
@@ -73,6 +73,22 @@ enum class DestructorContext {
   kRemovedFromNVM
 };
 
+struct MemoryDescriptorType {
+    MemoryDescriptorType(TierId tid, PoolId pid, ClassId cid) :
+        tid_(tid), pid_(pid), cid_(cid) {}
+    TierId tid_;
+    PoolId pid_;
+    ClassId cid_;
+
+    bool operator<(const MemoryDescriptorType& rhs) const {
+      return std::make_tuple(tid_, pid_, cid_) < std::make_tuple(rhs.tid_, rhs.pid_, rhs.cid_);
+    }
+
+    bool operator==(const MemoryDescriptorType& rhs) const {
+      return std::make_tuple(tid_, pid_, cid_) == std::make_tuple(rhs.tid_, rhs.pid_, rhs.cid_);
+    }
+};
+
 // A base class of cache exposing members and status agnostic of template type.
 class CacheBase {
  public:
@@ -85,6 +101,9 @@ class CacheBase {
   CacheBase(CacheBase&&) = default;
   CacheBase& operator=(CacheBase&&) = default;
 
+  // TODO: come up with some reasonable number
+  static constexpr unsigned kMaxTiers = 2;
+
   // Get a string referring to the cache name for this cache
   virtual const std::string getCacheName() const = 0;
 
@@ -96,12 +115,24 @@ class CacheBase {
   // @param poolId    The pool id to query
   virtual const MemoryPool& getPool(PoolId poolId) const = 0;
 
+  // Get the reference to a memory pool using a tier id, for stats purposes
+  //
+  // @param poolId    The pool id to query
+  // @param tierId    The tier of the pool id
+  virtual const MemoryPool& getPoolByTid(PoolId poolId, TierId tid) const = 0;
+
   // Get Pool specific stats (regular pools). This includes stats from the
   // Memory Pool and also the cache.
   //
   // @param poolId   the pool id
   virtual PoolStats getPoolStats(PoolId poolId) const = 0;
 
+  // Get Allocation Class specific stats.
+  //
+  // @param poolId   the pool id
+  // @param classId   the class id
+  virtual ACStats getACStats(TierId tid, PoolId poolId, ClassId classId) const = 0;
+
   // @param poolId   the pool id
   virtual AllSlabReleaseEvents getAllSlabReleaseEvents(PoolId poolId) const = 0;
 
diff --git a/cachelib/allocator/CacheAllocator.h b/cachelib/allocator/CacheAllocator.h
index 3b0d9eeaef..7422c1c61c 100644
--- a/cachelib/allocator/CacheAllocator.h
+++ b/cachelib/allocator/CacheAllocator.h
@@ -23,6 +23,8 @@
 #include 
 #include 
 #include 
+#include 
+#include 
 #include 
 
 #include 
@@ -59,6 +61,7 @@
 #include "cachelib/allocator/PoolOptimizer.h"
 #include "cachelib/allocator/PoolRebalancer.h"
 #include "cachelib/allocator/PoolResizer.h"
+#include "cachelib/allocator/PrivateMemoryManager.h"
 #include "cachelib/allocator/ReadOnlySharedCacheView.h"
 #include "cachelib/allocator/Reaper.h"
 #include "cachelib/allocator/RebalanceStrategy.h"
@@ -219,7 +222,7 @@ class CacheAllocator : public CacheBase {
   using PoolIds = std::set;
 
   using EventTracker = EventInterface;
-
+  using ClassBgStatsType = std::map;
   // SampleItem is a wrapper for the CacheItem which is provided as the sample
   // for uploading to Scuba (see ItemStatsExporter). It is guaranteed that the
   // CacheItem is accessible as long as the SampleItem is around since the
@@ -350,6 +353,38 @@ class CacheAllocator : public CacheBase {
     virtual bool isValid() const { return true; }
   };
   using ChainedItemMovingSync = std::function(Key)>;
+  
+  // Eviction related data returned from
+  // function executed under mmContainer lock
+  struct EvictionData {
+    EvictionData() = delete;
+    EvictionData(Item *candidate_, 
+                 Item *toRecycle_,
+                 Item *toRecycleParent_,
+                 bool chainedItem_,
+                 bool expired_,
+                 typename NvmCacheT::PutToken token_,
+                 WriteHandle candidateHandle_) :
+                 candidate(candidate_),
+                 toRecycle(toRecycle_),
+                 toRecycleParent(toRecycleParent_),
+                 expired(expired_),
+                 chainedItem(chainedItem_),
+                 token(std::move(token_)),
+                 candidateHandle(std::move(candidateHandle_)) {}
+
+    // item that is candidate for eviction
+    Item *candidate;
+    // acutal alloc that will be recycled
+    // back up to allocator
+    Item *toRecycle;
+    // possible parent ref
+    Item *toRecycleParent;
+    bool expired; //is item expired
+    bool chainedItem; //is it a chained item
+    typename NvmCacheT::PutToken token; //put token for NVM cache
+    WriteHandle candidateHandle; //hande in case we don't use moving bit
+  };
 
   using AccessContainer = typename Item::AccessContainer;
   using MMContainer = typename Item::MMContainer;
@@ -709,10 +744,10 @@ class CacheAllocator : public CacheBase {
   uint32_t getUsableSize(const Item& item) const;
 
   // create memory assignment to bg workers
-  auto createBgWorkerMemoryAssignments(size_t numWorkers);
+  auto createBgWorkerMemoryAssignments(size_t numWorkers, TierId tid);
 
   // whether bg worker should be woken
-  bool shouldWakeupBgEvictor(PoolId pid, ClassId cid);
+  bool shouldWakeupBgEvictor(TierId tid, PoolId pid, ClassId cid);
 
   // Get a random item from memory
   // This is useful for profiling and sampling cachelib managed memory
@@ -810,7 +845,7 @@ class CacheAllocator : public CacheBase {
   // @param config    new config for the pool
   //
   // @throw std::invalid_argument if the poolId is invalid
-  void overridePoolConfig(PoolId pid, const MMConfig& config);
+  void overridePoolConfig(TierId tid, PoolId pid, const MMConfig& config);
 
   // update an existing pool's rebalance strategy
   //
@@ -851,8 +886,9 @@ class CacheAllocator : public CacheBase {
   // @return  true if the operation succeeded. false if the size of the pool is
   //          smaller than _bytes_
   // @throw   std::invalid_argument if the poolId is invalid.
+  // TODO: should call shrinkPool for specific tier?
   bool shrinkPool(PoolId pid, size_t bytes) {
-    return allocator_->shrinkPool(pid, bytes);
+    return allocator_[currentTier()]->shrinkPool(pid, bytes);
   }
 
   // grow an existing pool by _bytes_. This will fail if there is no
@@ -861,8 +897,9 @@ class CacheAllocator : public CacheBase {
   // @return    true if the pool was grown. false if the necessary number of
   //            bytes were not available.
   // @throw     std::invalid_argument if the poolId is invalid.
+  // TODO: should call growPool for specific tier?
   bool growPool(PoolId pid, size_t bytes) {
-    return allocator_->growPool(pid, bytes);
+    return allocator_[currentTier()]->growPool(pid, bytes);
   }
 
   // move bytes from one pool to another. The source pool should be at least
@@ -875,7 +912,7 @@ class CacheAllocator : public CacheBase {
   //          correct size to do the transfer.
   // @throw   std::invalid_argument if src or dest is invalid pool
   bool resizePools(PoolId src, PoolId dest, size_t bytes) override {
-    return allocator_->resizePools(src, dest, bytes);
+    return allocator_[currentTier()]->resizePools(src, dest, bytes);
   }
 
   // Add a new compact cache with given name and size
@@ -1104,12 +1141,13 @@ class CacheAllocator : public CacheBase {
   // @throw std::invalid_argument if the memory does not belong to this
   //        cache allocator
   AllocInfo getAllocInfo(const void* memory) const {
-    return allocator_->getAllocInfo(memory);
+    return allocator_[getTierId(memory)]->getAllocInfo(memory);
   }
 
   // return the ids for the set of existing pools in this cache.
   std::set getPoolIds() const override final {
-    return allocator_->getPoolIds();
+    // all tiers have the same pool ids. TODO: deduplicate
+    return allocator_[0]->getPoolIds();
   }
 
   // return a list of pool ids that are backing compact caches. This includes
@@ -1121,18 +1159,22 @@ class CacheAllocator : public CacheBase {
 
   // return the pool with speicified id.
   const MemoryPool& getPool(PoolId pid) const override final {
-    return allocator_->getPool(pid);
+    return allocator_[currentTier()]->getPool(pid);
+  }
+
+  const MemoryPool& getPoolByTid(PoolId pid, TierId tid) const override final {
+    return allocator_[tid]->getPool(pid);
   }
 
   // calculate the number of slabs to be advised/reclaimed in each pool
   PoolAdviseReclaimData calcNumSlabsToAdviseReclaim() override final {
     auto regularPoolIds = getRegularPoolIds();
-    return allocator_->calcNumSlabsToAdviseReclaim(regularPoolIds);
+    return allocator_[currentTier()]->calcNumSlabsToAdviseReclaim(regularPoolIds);
   }
 
   // update number of slabs to advise in the cache
   void updateNumSlabsToAdvise(int32_t numSlabsToAdvise) override final {
-    allocator_->updateNumSlabsToAdvise(numSlabsToAdvise);
+    allocator_[currentTier()]->updateNumSlabsToAdvise(numSlabsToAdvise);
   }
 
   // returns a valid PoolId corresponding to the name or kInvalidPoolId if the
@@ -1140,8 +1182,9 @@ class CacheAllocator : public CacheBase {
   PoolId getPoolId(folly::StringPiece name) const noexcept;
 
   // returns the pool's name by its poolId.
-  std::string getPoolName(PoolId poolId) const override {
-    return allocator_->getPoolName(poolId);
+  std::string getPoolName(PoolId poolId) const {
+    // all tiers have the same pool names.
+    return allocator_[0]->getPoolName(poolId);
   }
 
   // get stats related to all kinds of slab release events.
@@ -1174,6 +1217,43 @@ class CacheAllocator : public CacheBase {
     return stats;
   }
 
+  // returns the background mover stats per thread
+  std::vector getBackgroundMoverStats(MoverDir direction) const {
+    auto stats = std::vector();
+    if (direction == MoverDir::Evict) {
+      for (auto& bg : backgroundEvictor_)
+        stats.push_back(bg->getStats());
+    } else if (direction == MoverDir::Promote) {
+      for (auto& bg : backgroundPromoter_)
+        stats.push_back(bg->getStats());
+    }
+    return stats;
+  }
+
+  ClassBgStatsType
+  getBackgroundMoverClassStats(MoverDir direction) const {
+    ClassBgStatsType stats;
+    auto record = [&](auto &bg) {
+      //gives a unique descriptor
+      auto classStats = bg->getClassStats();
+      for (const auto& [key,value] : classStats) {
+          stats[key] = value;
+      }
+    };
+
+    if (direction == MoverDir::Evict) {
+      for (auto& bg : backgroundEvictor_) {
+          record(bg);
+      }
+    } else if (direction == MoverDir::Promote) {
+      for (auto& bg : backgroundPromoter_) {
+          record(bg);
+      }
+    }
+
+    return stats;
+  }
+
   // returns the pool rebalancer stats
   RebalancerStats getRebalancerStats() const {
     auto stats =
@@ -1199,6 +1279,8 @@ class CacheAllocator : public CacheBase {
 
   // pool stats by pool id
   PoolStats getPoolStats(PoolId pid) const override final;
+  // pool stats by tier id and pool id
+  PoolStats getPoolStats(TierId tid, PoolId pid) const;
 
   // This can be expensive so it is not part of PoolStats
   PoolEvictionAgeStats getPoolEvictionAgeStats(
@@ -1213,6 +1295,9 @@ class CacheAllocator : public CacheBase {
   // return cache's memory usage stats
   CacheMemoryStats getCacheMemoryStats() const override final;
 
+  // return stats for Allocation Class
+  ACStats getACStats(TierId tid, PoolId pid, ClassId cid) const override final;
+
   // return the nvm cache stats map
   util::StatsMap getNvmCacheStatsMap() const override final;
 
@@ -1322,6 +1407,7 @@ class CacheAllocator : public CacheBase {
                  sizeof(typename RefcountWithFlags::Value) + sizeof(uint32_t) +
                  sizeof(uint32_t) + sizeof(KAllocation)) == sizeof(Item),
                 "vtable overhead");
+  // Check for CompressedPtr single/multi tier support
   static_assert(32 == sizeof(Item), "item overhead is 32 bytes");
 
   // make sure there is no overhead in ChainedItem on top of a regular Item
@@ -1416,11 +1502,14 @@ class CacheAllocator : public CacheBase {
 
   using MMContainerPtr = std::unique_ptr;
   using MMContainers =
-      std::array,
-                 MemoryPoolManager::kMaxPools>;
+      std::vector,
+                 MemoryPoolManager::kMaxPools>>;
 
   void createMMContainers(const PoolId pid, MMConfig config);
 
+  TierId getTierId(const Item& item) const;
+  TierId getTierId(const void* ptr) const;
+
   // acquire the MMContainer corresponding to the the Item's class and pool.
   //
   // @return pointer to the MMContainer.
@@ -1428,7 +1517,12 @@ class CacheAllocator : public CacheBase {
   // allocation from the memory allocator.
   MMContainer& getMMContainer(const Item& item) const noexcept;
 
-  MMContainer& getMMContainer(PoolId pid, ClassId cid) const noexcept;
+  MMContainer& getMMContainer(TierId tid, PoolId pid, ClassId cid) const noexcept;
+
+  // Get stats of the specified pid and cid.
+  // If such mmcontainer is not valid (pool id or cid out of bound)
+  // or the mmcontainer is not initialized, return an empty stat.
+  MMContainerStat getMMContainerStat(TierId tid, PoolId pid, ClassId cid) const noexcept;
 
   // create a new cache allocation. The allocation can be initialized
   // appropriately and made accessible through insert or insertOrReplace.
@@ -1459,8 +1553,49 @@ class CacheAllocator : public CacheBase {
                                Key key,
                                uint32_t size,
                                uint32_t creationTime,
-                               uint32_t expiryTime,
-                               bool fromBgThread = false);
+                               uint32_t expiryTime);
+
+  // create a new cache allocation on specific memory tier.
+  // For description see allocateInternal.
+  //
+  // @param tid id a memory tier
+  // @param evict whether to evict an item from tier tid in case there
+  //        is not enough memory
+  WriteHandle allocateInternalTier(TierId tid,
+                                   PoolId id,
+                                   Key key,
+                                   uint32_t size,
+                                   uint32_t creationTime,
+                                   uint32_t expiryTime,
+                                   bool evict);
+  
+  // create a new cache allocation on specific memory tier,
+  // for a given class id. used in moving between tiers since
+  // class id's are the same among the tiers.
+  // For description see allocateInternal.
+  //
+  // @param tid id a memory tier
+  // @param pid a poold id
+  // @param cid a class id
+  //
+  void* allocateInternalTierByCid(TierId tid,
+                                   PoolId pid,
+                                   ClassId cid);
+
+  // create a new cache allocation on specific memory tier,
+  // for a given class id. used in moving between tiers since
+  // class id's are the same among the tiers.
+  // For description see allocateInternal.
+  //
+  // @param tid id a memory tier
+  // @param pid a poold id
+  // @param cid a class id
+  // @param batch the number of allocations to make
+  //
+  std::vector allocateInternalTierByCidBatch(TierId tid,
+                                   PoolId pid,
+                                   ClassId cid,
+                                   uint64_t batch);
 
   // Allocate a chained item
   //
@@ -1478,14 +1613,34 @@ class CacheAllocator : public CacheBase {
   //            if the item is invalid
   WriteHandle allocateChainedItemInternal(const Item& parent, uint32_t size);
 
+  // Allocate a chained item to a specific tier
+  //
+  // The resulting chained item does not have a parent item yet
+  // and if we fail to link to the chain for any reasoin
+  // the chained item will be freed once the handle is dropped.
+  //
+  // The parent item parameter here is mainly used to find the
+  // correct pool to allocate memory for this chained item
+  //
+  // @param parent    parent item
+  // @param size      the size for the chained allocation
+  // @param tid       the tier to allocate on
+  //
+  // @return    handle to the chained allocation
+  // @throw     std::invalid_argument if the size requested is invalid or
+  //            if the item is invalid
+  WriteHandle allocateChainedItemInternalTier(const Item& parent,
+                                              uint32_t size,
+                                              TierId tid);
+
   // Given an existing item, allocate a new one for the
   // existing one to later be moved into.
   //
-  // @param oldItem    the item we want to allocate a new item for
+  // @param item   reference to the item we want to allocate a new item for
   //
   // @return  handle to the newly allocated item
   //
-  WriteHandle allocateNewItemForOldItem(const Item& oldItem);
+  WriteHandle allocateNewItemForOldItem(const Item& item);
 
   // internal helper that grabs a refcounted handle to the item. This does
   // not record the access to reflect in the mmContainer.
@@ -1544,12 +1699,14 @@ class CacheAllocator : public CacheBase {
   // callback is responsible for copying the contents and fixing the semantics
   // of chained item.
   //
-  // @param oldItem     Reference to the item being moved
+  // @param oldItem     item being moved
   // @param newItemHdl  Reference to the handle of the new item being moved into
-  //
+  // @param skipAddInMMContainer so we can tell if we should add in mmContainer or wait
+  //                     to do in batch
+  // @param fromBgThread use memmove instead of memcopy (for DTO testing)
   // @return true  If the move was completed, and the containers were updated
   //               successfully.
-  bool moveRegularItem(Item& oldItem, WriteHandle& newItemHdl);
+  bool moveRegularItem(Item& oldItem, WriteHandle& newItemHdl, bool skipAddInMMContainer, bool fromBgThread);
 
   // template class for viewAsChainedAllocs that takes either ReadHandle or
   // WriteHandle
@@ -1582,9 +1739,8 @@ class CacheAllocator : public CacheBase {
   // will be unmarked as having chained allocations. Parent will not be null
   // after calling this API.
   //
-  // Parent and NewParent must be valid handles to items with same key and
-  // parent must have chained items and parent handle must be the only
-  // outstanding handle for parent. New parent must be without any chained item
+  // NewParent must be valid handles to item with same key as Parent and
+  // Parent must have chained items. New parent must be without any chained item
   // handles.
   //
   // Chained item lock for the parent's key needs to be held in exclusive mode.
@@ -1711,15 +1867,18 @@ class CacheAllocator : public CacheBase {
   // Implementation to find a suitable eviction from the container. The
   // two parameters together identify a single container.
   //
+  // @param  tid  the id of the tier to look for evictions inside
   // @param  pid  the id of the pool to look for evictions inside
   // @param  cid  the id of the class to look for evictions inside
   // @return An evicted item or nullptr  if there is no suitable candidate found
   // within the configured number of attempts.
-  Item* findEviction(PoolId pid, ClassId cid);
+  Item* findEviction(TierId tid, PoolId pid, ClassId cid);
+  std::vector-  findEvictionBatch(TierId tid, PoolId pid, ClassId cid, unsigned int batch);
 
   // Get next eviction candidate from MMContainer, remove from AccessContainer,
   // MMContainer and insert into NVMCache if enabled.
   //
+  // @param tid  the id of the tier to look for evictions inside
   // @param pid  the id of the pool to look for evictions inside
   // @param cid  the id of the class to look for evictions inside
   // @param searchTries number of search attempts so far.
@@ -1727,11 +1886,68 @@ class CacheAllocator : public CacheBase {
   // @return pair of [candidate, toRecycle]. Pair of null if reached the end of
   // the eviction queue or no suitable candidate found
   // within the configured number of attempts
-  std::pair-  getNextCandidate(PoolId pid,
+  std::pair-  getNextCandidate(TierId tid,
+                                           PoolId pid,
                                            ClassId cid,
                                            unsigned int& searchTries);
 
   using EvictionIterator = typename MMContainer::LockedIterator;
+  // similiar to the above method but returns a batch of evicted items
+  // as a pair of vectors
+  std::vector getNextCandidates(TierId tid,
+                                              PoolId pid,
+                                              ClassId cid,
+                                              unsigned int batch,
+                                              bool markMoving,
+                                              bool fromBgThread);
+  
+  std::vector-  getNextCandidatesPromotion(TierId tid,
+                                       PoolId pid,
+                                       ClassId cid,
+                                       unsigned int batch,
+                                       bool markMoving,
+                                       bool fromBgThread);
+
+  // 
+  // Common function in case move among tiers fails during eviction
+  // @param candidate that failed to move
+  // @param the corresponding put token
+  // @param if we are on the last tier
+  // @param if candidate is expired
+  // @param if we are using moving bit
+  //
+  // if insertOrReplace was called during move
+  // then candidate will not be accessible (failed replace during tryEvict)
+  //  - therefore this was why we failed to
+  //    evict to the next tier and insertOrReplace
+  //    will remove from NVM cache
+  // however, if candidate is accessible
+  // that means the allocation in the next
+  // tier failed - so we will continue to
+  // evict the item to NVM cache
+  bool handleFailedMove(Item* candidate, 
+                        typename NvmCacheT::PutToken& token, 
+                        bool isExpired,
+                        bool markMoving);
+
+  // Try to move the item down to the next memory tier
+  //
+  // @param tid current tier ID of the item
+  // @param pid the pool ID the item belong to.
+  // @param item the item to evict
+  //
+  // @return valid handle to the item. This will be the last
+  //         handle to the item. On failure an empty handle.
+  WriteHandle tryEvictToNextMemoryTier(TierId tid, PoolId pid, Item& item);
+
+  // Try to move the item down to the next memory tier
+  //
+  // @param item the item to evict
+  //
+  // @return valid handle to the item. This will be the last
+  //         handle to the item. On failure an empty handle. 
+  WriteHandle tryEvictToNextMemoryTier(Item& item);
+
 
   // Wakes up waiters if there are any
   //
@@ -1758,7 +1974,7 @@ class CacheAllocator : public CacheBase {
       const typename Item::PtrCompressor& compressor);
 
   unsigned int reclaimSlabs(PoolId id, size_t numSlabs) final {
-    return allocator_->reclaimSlabsAndGrow(id, numSlabs);
+    return allocator_[currentTier()]->reclaimSlabsAndGrow(id, numSlabs);
   }
 
   FOLLY_ALWAYS_INLINE EventTracker* getEventTracker() const {
@@ -1817,7 +2033,7 @@ class CacheAllocator : public CacheBase {
                    const void* hint = nullptr) final;
 
   // @param releaseContext  slab release context
-  void releaseSlabImpl(const SlabReleaseContext& releaseContext);
+  void releaseSlabImpl(TierId tid, const SlabReleaseContext& releaseContext);
 
   // @return  true when successfully marked as moving,
   //          fasle when this item has already been freed
@@ -1860,24 +2076,37 @@ class CacheAllocator : public CacheBase {
     // primitives. So we consciously exempt ourselves here from TSAN data race
     // detection.
     folly::annotate_ignore_thread_sanitizer_guard g(__FILE__, __LINE__);
-    auto slabsSkipped = allocator_->forEachAllocation(std::forward(f));
+    auto slabsSkipped = allocator_[currentTier()]->forEachAllocation(std::forward(f));
     stats().numReaperSkippedSlabs.add(slabsSkipped);
   }
 
   // exposed for the background evictor to iterate through the memory and evict
   // in batch. This should improve insertion path for tiered memory config
-  size_t traverseAndEvictItems(unsigned int /* pid */,
-                               unsigned int /* cid */,
-                               size_t /* batch */) {
-    throw std::runtime_error("Not supported yet!");
+  size_t traverseAndEvictItems(unsigned int tid,
+                               unsigned int pid,
+                               unsigned int cid,
+                               size_t batch) {
+    util::LatencyTracker tracker{stats().bgEvictLatency_, batch};
+    auto& mmContainer = getMMContainer(tid, pid, cid);
+    uint32_t currItems = mmContainer.size();
+    if (currItems < batch) {
+      batch = currItems;
+      if (batch == 0) {
+        return 0;
+      }
+    }
+    auto evictionData = getNextCandidates(tid,pid,cid,batch,
+                                     true,true);
+    size_t evictions = evictionData.size();
+    (*stats_.regularItemEvictions)[tid][pid][cid].add(evictions);
+    return evictions;
   }
-
-  // exposed for the background promoter to iterate through the memory and
-  // promote in batch. This should improve find latency
-  size_t traverseAndPromoteItems(unsigned int /* pid */,
-                                 unsigned int /* cid */,
-                                 size_t /* batch */) {
-    throw std::runtime_error("Not supported yet!");
+  
+  size_t traverseAndPromoteItems(unsigned int tid, unsigned int pid, unsigned int cid, size_t batch) {
+    util::LatencyTracker tracker{stats().bgPromoteLatency_, batch};
+    auto candidates = getNextCandidatesPromotion(tid,pid,cid,batch,
+                                     true,true);
+    return candidates.size();
   }
 
   // returns true if nvmcache is enabled and we should write this item to
@@ -1920,10 +2149,12 @@ class CacheAllocator : public CacheBase {
                   std::unique_ptr& worker,
                   std::chrono::seconds timeout = std::chrono::seconds{0});
 
-  ShmSegmentOpts createShmCacheOpts();
-  std::unique_ptr createNewMemoryAllocator();
-  std::unique_ptr restoreMemoryAllocator();
-  std::unique_ptr restoreCCacheManager();
+  ShmSegmentOpts createShmCacheOpts(TierId tid);
+  PrivateSegmentOpts createPrivateSegmentOpts(TierId tid);
+  std::unique_ptr createPrivateAllocator(TierId tid);
+  std::unique_ptr createNewMemoryAllocator(TierId tid);
+  std::unique_ptr restoreMemoryAllocator(TierId tid);
+  std::unique_ptr restoreCCacheManager(TierId tid);
 
   PoolIds filterCompactCachePools(const PoolIds& poolIds) const;
 
@@ -1943,7 +2174,7 @@ class CacheAllocator : public CacheBase {
   }
 
   typename Item::PtrCompressor createPtrCompressor() const {
-    return allocator_->createPtrCompressor- ();
+    return typename Item::PtrCompressor(allocator_);
   }
 
   // helper utility to throttle and optionally log.
@@ -1966,9 +2197,14 @@ class CacheAllocator : public CacheBase {
 
   // @param type        the type of initialization
   // @return nullptr if the type is invalid
-  // @return pointer to memory allocator
+  // @return vector of pointers to memory allocator
   // @throw std::runtime_error if type is invalid
-  std::unique_ptr initAllocator(InitMemType type);
+  std::vector> initAllocator(InitMemType type);
+
+  std::vector> createPrivateAllocators();
+  std::vector> createAllocators();
+  std::vector> restoreAllocators();
+
   // @param type        the type of initialization
   // @return nullptr if the type is invalid
   // @return pointer to access container
@@ -1980,18 +2216,14 @@ class CacheAllocator : public CacheBase {
   std::optional saveNvmCache();
   void saveRamCache();
 
-  static bool itemExclusivePredicate(const Item& item) {
-    return item.getRefCount() == 0;
+  static bool itemSlabMovePredicate(const Item& item) {
+    return item.isMoving() && item.getRefCount() == 0;
   }
 
   static bool itemExpiryPredicate(const Item& item) {
     return item.getRefCount() == 1 && item.isExpired();
   }
 
-  static bool parentEvictForSlabReleasePredicate(const Item& item) {
-    return item.getRefCount() == 1 && !item.isMoving();
-  }
-
   std::unique_ptr createDeserializer();
 
   // Execute func on each item. `func` can throw exception but must ensure
@@ -2028,44 +2260,6 @@ class CacheAllocator : public CacheBase {
                      : false;
   }
 
-  // returns the background mover stats
-  BackgroundMoverStats getBackgroundMoverStats(MoverDir direction) const {
-    auto stats = BackgroundMoverStats{};
-    if (direction == MoverDir::Evict) {
-      for (auto& bg : backgroundEvictor_)
-        stats += bg->getStats();
-    } else if (direction == MoverDir::Promote) {
-      for (auto& bg : backgroundPromoter_)
-        stats += bg->getStats();
-    }
-    return stats;
-  }
-
-  std::map> getBackgroundMoverClassStats(
-      MoverDir direction) const {
-    std::map> stats;
-
-    if (direction == MoverDir::Evict) {
-      for (auto& bg : backgroundEvictor_) {
-        for (auto& pid : bg->getClassStats()) {
-          for (auto& cid : pid.second) {
-            stats[pid.first][cid.first] += cid.second;
-          }
-        }
-      }
-    } else if (direction == MoverDir::Promote) {
-      for (auto& bg : backgroundPromoter_) {
-        for (auto& pid : bg->getClassStats()) {
-          for (auto& cid : pid.second) {
-            stats[pid.first][cid.first] += cid.second;
-          }
-        }
-      }
-    }
-
-    return stats;
-  }
-
   bool tryGetHandleWithWaitContextForMovingItem(Item& item,
                                                 WriteHandle& handle);
 
@@ -2148,6 +2342,19 @@ class CacheAllocator : public CacheBase {
 
   // BEGIN private members
 
+  TierId currentTier() const {
+    // TODO: every function which calls this method should be refactored.
+    // We should go case by case and either make such function work on
+    // all tiers or expose separate parameter to describe the tier ID.
+    return 0;
+  }
+
+  unsigned getNumTiers() const {
+    return config_.memoryTierConfigs.size();
+  }
+
+  size_t memoryTierSize(TierId tid) const;
+
   // Whether the memory allocator for this cache allocator was created on shared
   // memory. The hash table, chained item hash table etc is also created on
   // shared memory except for temporary shared memory mode when they're created
@@ -2160,6 +2367,8 @@ class CacheAllocator : public CacheBase {
   // is not persisted when cache process exits.
   std::unique_ptr tempShm_;
 
+  std::unique_ptr privMemManager_;
+
   std::unique_ptr shmManager_;
 
   // Deserialize data to restore cache allocator. Used only while attaching to
@@ -2173,9 +2382,10 @@ class CacheAllocator : public CacheBase {
   const MMConfig mmConfig_{};
 
   // the memory allocator for allocating out of the available memory.
-  std::unique_ptr allocator_;
+  std::vector> allocator_;
 
   // compact cache allocator manager
+  // TODO: per tier?
   std::unique_ptr compactCacheManager_;
 
   // compact cache instances reside here when user "add" or "attach" compact
@@ -2226,7 +2436,7 @@ class CacheAllocator : public CacheBase {
   // free memory monitor
   std::unique_ptr memMonitor_;
 
-  // background evictor
+  // background data movement
   std::vector>> backgroundEvictor_;
   std::vector>> backgroundPromoter_;
 
@@ -2371,6 +2581,9 @@ CacheAllocator::CacheAllocator(
       tempShm_(type == InitMemType::kNone && isOnShm_
                    ? std::make_unique(config_.getCacheSize())
                    : nullptr),
+      privMemManager_(type == InitMemType::kNone && !isOnShm_
+                          ? std::make_unique()
+                          : nullptr),
       shmManager_(type != InitMemType::kNone
                       ? std::make_unique(config_.cacheDir,
                                                      config_.isUsingPosixShm())
@@ -2382,12 +2595,12 @@ CacheAllocator::CacheAllocator(
                     : serialization::CacheAllocatorMetadata{}},
       allocator_(initAllocator(type)),
       compactCacheManager_(type != InitMemType::kMemAttach
-                               ? std::make_unique(*allocator_)
-                               : restoreCCacheManager()),
+                               ? std::make_unique(*allocator_[0] /* TODO: per tier */)
+                               : restoreCCacheManager(0/* TODO: per tier */)),
       compressor_(createPtrCompressor()),
       mmContainers_(type == InitMemType::kMemAttach
                         ? deserializeMMContainers(*deserializer_, compressor_)
-                        : MMContainers{}),
+                        : MMContainers{getNumTiers()}),
       accessContainer_(initAccessContainer(
           type, detail::kShmHashTableName, config.accessConfig)),
       chainedItemAccessContainer_(
@@ -2422,48 +2635,115 @@ CacheAllocator::~CacheAllocator() {
 }
 
 template 
-ShmSegmentOpts CacheAllocator::createShmCacheOpts() {
+ShmSegmentOpts CacheAllocator::createShmCacheOpts(TierId tid) {
   ShmSegmentOpts opts;
   opts.alignment = sizeof(Slab);
   // TODO: we support single tier so far
-  if (config_.memoryTierConfigs.size() > 1) {
-    throw std::invalid_argument("CacheLib only supports a single memory tier");
+  if (config_.memoryTierConfigs.size() > 2) {
+    throw std::invalid_argument("CacheLib only supports two memory tiers");
   }
-  opts.memBindNumaNodes = config_.memoryTierConfigs[0].getMemBind();
+  opts.memBindNumaNodes = config_.memoryTierConfigs[tid].getMemBind();
   return opts;
 }
 
+template 
+PrivateSegmentOpts CacheAllocator::createPrivateSegmentOpts(TierId tid) {
+  PrivateSegmentOpts opts;
+  opts.alignment = sizeof(Slab);
+  auto memoryTierConfigs = config_.getMemoryTierConfigs();
+  opts.memBindNumaNodes = memoryTierConfigs[tid].getMemBind();
+
+  return opts;
+}
+
+template 
+size_t CacheAllocator::memoryTierSize(TierId tid) const {
+  auto partitions = std::accumulate(config_.memoryTierConfigs.begin(), config_.memoryTierConfigs.end(), 0UL,
+  [](const size_t i, const MemoryTierCacheConfig& config){
+    return i + config.getRatio();
+  });
+
+  return config_.memoryTierConfigs[tid].calculateTierSize(config_.getCacheSize(), partitions);
+}
+
 template 
 std::unique_ptr
-CacheAllocator::createNewMemoryAllocator() {
+CacheAllocator::createPrivateAllocator(TierId tid) {
+  if (isOnShm_) {
+    return std::make_unique(
+                            getAllocatorConfig(config_),
+                            tempShm_->getAddr(),
+                            memoryTierSize(tid));
+  } else {
+    return std::make_unique(
+                            getAllocatorConfig(config_),
+                            privMemManager_->createMapping(config_.size, createPrivateSegmentOpts(tid)),
+                            memoryTierSize(tid));
+  }
+}
+
+template 
+std::unique_ptr
+CacheAllocator::createNewMemoryAllocator(TierId tid) {
+  size_t tierSize = memoryTierSize(tid);
   return std::make_unique(
       getAllocatorConfig(config_),
       shmManager_
-          ->createShm(detail::kShmCacheName, config_.getCacheSize(),
-                      config_.slabMemoryBaseAddr, createShmCacheOpts())
+          ->createShm(detail::kShmCacheName + std::to_string(tid),
+                      tierSize, config_.slabMemoryBaseAddr,
+                      createShmCacheOpts(tid))
           .addr,
-      config_.getCacheSize());
+      tierSize);
 }
 
 template 
 std::unique_ptr
-CacheAllocator::restoreMemoryAllocator() {
+CacheAllocator::restoreMemoryAllocator(TierId tid) {
   return std::make_unique(
       deserializer_->deserialize(),
       shmManager_
-          ->attachShm(detail::kShmCacheName, config_.slabMemoryBaseAddr,
-                      createShmCacheOpts())
-          .addr,
-      config_.getCacheSize(),
+          ->attachShm(detail::kShmCacheName + std::to_string(tid),
+            config_.slabMemoryBaseAddr, createShmCacheOpts(tid)).addr,
+      memoryTierSize(tid),
       config_.disableFullCoredump);
 }
 
+template 
+std::vector>
+CacheAllocator::createPrivateAllocators() {
+  std::vector> allocators;
+  for (int tid = 0; tid < getNumTiers(); tid++) {
+    allocators.emplace_back(createPrivateAllocator(tid));
+  }
+  return allocators;
+}
+
+template 
+std::vector>
+CacheAllocator::createAllocators() {
+  std::vector> allocators;
+  for (int tid = 0; tid < getNumTiers(); tid++) {
+    allocators.emplace_back(createNewMemoryAllocator(tid));
+  }
+  return allocators;
+}
+
+template 
+std::vector>
+CacheAllocator::restoreAllocators() {
+  std::vector> allocators;
+  for (int tid = 0; tid < getNumTiers(); tid++) {
+    allocators.emplace_back(restoreMemoryAllocator(tid));
+  }
+  return allocators;
+}
+
 template 
 std::unique_ptr
-CacheAllocator::restoreCCacheManager() {
+CacheAllocator::restoreCCacheManager(TierId tid) {
   return std::make_unique(
       deserializer_->deserialize(),
-      *allocator_);
+      *allocator_[tid]);
 }
 
 template 
@@ -2567,21 +2847,15 @@ void CacheAllocator::initWorkers() {
 }
 
 template 
-std::unique_ptr CacheAllocator::initAllocator(
+std::vector>
+CacheAllocator::initAllocator(
     InitMemType type) {
   if (type == InitMemType::kNone) {
-    if (isOnShm_ == true) {
-      return std::make_unique(getAllocatorConfig(config_),
-                                               tempShm_->getAddr(),
-                                               config_.getCacheSize());
-    } else {
-      return std::make_unique(getAllocatorConfig(config_),
-                                               config_.getCacheSize());
-    }
+    return createPrivateAllocators();
   } else if (type == InitMemType::kMemNew) {
-    return createNewMemoryAllocator();
+    return createAllocators();
   } else if (type == InitMemType::kMemAttach) {
-    return restoreMemoryAllocator();
+    return restoreAllocators();
   }
 
   // Invalid type
@@ -2649,42 +2923,83 @@ CacheAllocator::allocate(PoolId poolId,
 }
 
 template 
-bool CacheAllocator::shouldWakeupBgEvictor(PoolId /* pid */,
-                                                       ClassId /* cid */) {
+bool CacheAllocator::shouldWakeupBgEvictor(TierId tid, PoolId pid, ClassId cid) {
+  // TODO: should we also work on lower tiers? should we have separate set of params?
+  if (tid == 1) return false;
+  double usage = getPoolByTid(pid, tid).getApproxUsage(cid);
+  if (((1-usage)*100) <= config_.lowEvictionAcWatermark) {
+    return true;
+  }
   return false;
 }
 
 template 
-typename CacheAllocator::WriteHandle
-CacheAllocator::allocateInternal(PoolId pid,
-                                             typename Item::Key key,
-                                             uint32_t size,
-                                             uint32_t creationTime,
-                                             uint32_t expiryTime,
-                                             bool fromBgThread) {
+std::vector CacheAllocator::allocateInternalTierByCidBatch(TierId tid,
+                                                 PoolId pid,
+                                                 ClassId cid, uint64_t batch) {
   util::LatencyTracker tracker{stats().allocateLatency_};
 
+  SCOPE_FAIL { stats_.invalidAllocs.add(batch); };
+
+  util::RollingLatencyTracker rollTracker{
+      (*stats_.classAllocLatency)[tid][pid][cid]};
+
+  (*stats_.allocAttempts)[tid][pid][cid].add(batch);
+  
+  auto memory = allocator_[tid]->allocateByCidBatch(pid, cid, batch);
+  
+  if (memory.size() < batch) {
+    uint64_t toEvict = batch - memory.size();
+    auto evicted = findEvictionBatch(tid, pid, cid, toEvict);
+    if (evicted.size() < toEvict) {
+      (*stats_.allocFailures)[tid][pid][cid].add(toEvict - evicted.size());
+    }
+    if (evicted.size() > 0) {
+      //case where we some allocations from eviction - add them to
+      //the new allocations
+      memory.insert(memory.end(),evicted.begin(),evicted.end());
+      return memory;
+    }
+  }
+  return memory;
+}
+
+template 
+typename CacheAllocator::WriteHandle
+CacheAllocator::allocateInternalTier(TierId tid,
+                                                 PoolId pid,
+                                                 typename Item::Key key,
+                                                 uint32_t size,
+                                                 uint32_t creationTime,
+                                                 uint32_t expiryTime,
+                                                 bool evict) {
+  util::LatencyTracker tracker{stats().allocateLatency_};
   SCOPE_FAIL { stats_.invalidAllocs.inc(); };
 
   // number of bytes required for this item
   const auto requiredSize = Item::getRequiredSize(key, size);
 
   // the allocation class in our memory allocator.
-  const auto cid = allocator_->getAllocationClassId(pid, requiredSize);
+  const auto cid = allocator_[tid]->getAllocationClassId(pid, requiredSize);
+  util::RollingLatencyTracker rollTracker{
+      (*stats_.classAllocLatency)[tid][pid][cid]};
 
-  (*stats_.allocAttempts)[pid][cid].inc();
+  (*stats_.allocAttempts)[tid][pid][cid].inc();
 
-  void* memory = allocator_->allocate(pid, requiredSize);
+  void* memory = allocator_[tid]->allocate(pid, requiredSize);
 
-  if (backgroundEvictor_.size() && !fromBgThread &&
-      (memory == nullptr || shouldWakeupBgEvictor(pid, cid))) {
+  if (backgroundEvictor_.size() &&
+      (memory == nullptr || shouldWakeupBgEvictor(tid, pid, cid))) {
     backgroundEvictor_[BackgroundMover::workerId(
-                           pid, cid, backgroundEvictor_.size())]
+                         tid, pid, cid, backgroundEvictor_.size())]
         ->wakeUp();
   }
 
   if (memory == nullptr) {
-    memory = findEviction(pid, cid);
+    if (!evict || config_.noOnlineEviction) {
+      return {};
+    }
+    memory = findEviction(tid, pid, cid);
   }
 
   WriteHandle handle;
@@ -2695,18 +3010,18 @@ CacheAllocator::allocateInternal(PoolId pid,
     // for example.
     SCOPE_FAIL {
       // free back the memory to the allocator since we failed.
-      allocator_->free(memory);
+      allocator_[tid]->free(memory);
     };
 
     handle = acquire(new (memory) Item(key, size, creationTime, expiryTime));
     if (handle) {
       handle.markNascent();
-      (*stats_.fragmentationSize)[pid][cid].add(
+      (*stats_.fragmentationSize)[tid][pid][cid].add(
           util::getFragmentation(*this, *handle));
     }
 
   } else { // failed to allocate memory.
-    (*stats_.allocFailures)[pid][cid].inc();
+    (*stats_.allocFailures)[tid][pid][cid].inc();
     // wake up rebalancer
     if (!config_.poolRebalancerDisableForcedWakeUp && poolRebalancer_) {
       poolRebalancer_->wakeUp();
@@ -2723,6 +3038,23 @@ CacheAllocator::allocateInternal(PoolId pid,
   return handle;
 }
 
+template 
+typename CacheAllocator::WriteHandle
+CacheAllocator::allocateInternal(PoolId pid,
+                                             typename Item::Key key,
+                                             uint32_t size,
+                                             uint32_t creationTime,
+                                             uint32_t expiryTime) {
+  auto tid = 0; /* TODO: consult admission policy */
+  for(TierId tid = 0; tid < getNumTiers(); ++tid) {
+    bool evict = (!config_.insertToFirstFreeTier || tid == getNumTiers() - 1);
+    auto handle = allocateInternalTier(tid, pid, key, size, creationTime,
+                                       expiryTime, evict);
+    if (handle) return handle;
+  }
+  return {};
+}
+
 template 
 typename CacheAllocator::WriteHandle
 CacheAllocator::allocateChainedItem(const ReadHandle& parent,
@@ -2746,35 +3078,55 @@ template 
 typename CacheAllocator::WriteHandle
 CacheAllocator::allocateChainedItemInternal(const Item& parent,
                                                         uint32_t size) {
+  auto tid = 0; /* TODO: consult admission policy */
+  for(TierId tid = 0; tid < getNumTiers(); ++tid) {
+    auto handle = allocateChainedItemInternalTier(parent, size, tid);
+    if (handle) return handle;
+  }
+  return {};
+}
+
+template 
+typename CacheAllocator::WriteHandle
+CacheAllocator::allocateChainedItemInternalTier(const Item& parent,
+                                                            uint32_t size,
+                                                            TierId tid) {
   util::LatencyTracker tracker{stats().allocateLatency_};
 
   SCOPE_FAIL { stats_.invalidAllocs.inc(); };
 
   // number of bytes required for this item
   const auto requiredSize = ChainedItem::getRequiredSize(size);
+  
+  //this is okay because pools/classes are duplicated among the tiers
+  auto ptid = getTierId(parent);
+  const auto pid = allocator_[ptid]->getAllocInfo(parent.getMemory()).poolId;
+  const auto cid = allocator_[ptid]->getAllocationClassId(pid, requiredSize);
 
-  const auto pid = allocator_->getAllocInfo(parent.getMemory()).poolId;
-  const auto cid = allocator_->getAllocationClassId(pid, requiredSize);
+  // TODO: per-tier? Right now stats_ are not used in any public periodic
+  // worker
+  util::RollingLatencyTracker rollTracker{
+      (*stats_.classAllocLatency)[tid][pid][cid]};
 
-  (*stats_.allocAttempts)[pid][cid].inc();
+  (*stats_.allocAttempts)[tid][pid][cid].inc();
 
-  void* memory = allocator_->allocate(pid, requiredSize);
+  void* memory = allocator_[tid]->allocate(pid, requiredSize);
   if (memory == nullptr) {
-    memory = findEviction(pid, cid);
+    memory = findEviction(tid, pid, cid);
   }
   if (memory == nullptr) {
-    (*stats_.allocFailures)[pid][cid].inc();
+    (*stats_.allocFailures)[tid][pid][cid].inc();
     return WriteHandle{};
   }
 
-  SCOPE_FAIL { allocator_->free(memory); };
+  SCOPE_FAIL { allocator_[tid]->free(memory); };
 
   auto child = acquire(new (memory) ChainedItem(
       compressor_.compress(&parent), size, util::getCurrentTimeSec()));
 
   if (child) {
     child.markNascent();
-    (*stats_.fragmentationSize)[pid][cid].add(
+    (*stats_.fragmentationSize)[tid][pid][cid].add(
         util::getFragmentation(*this, *child));
   }
 
@@ -3101,8 +3453,8 @@ CacheAllocator::releaseBackToAllocator(Item& it,
     throw std::runtime_error(
         folly::sformat("cannot release this item: {}", it.toString()));
   }
-
-  const auto allocInfo = allocator_->getAllocInfo(it.getMemory());
+  const auto tid = getTierId(it);
+  const auto allocInfo = allocator_[tid]->getAllocInfo(it.getMemory());
 
   if (ctx == RemoveContext::kEviction) {
     const auto timeNow = util::getCurrentTimeSec();
@@ -3113,21 +3465,23 @@ CacheAllocator::releaseBackToAllocator(Item& it,
     stats_.perPoolEvictionAgeSecs_[allocInfo.poolId].trackValue(refreshTime);
   }
 
-  (*stats_.fragmentationSize)[allocInfo.poolId][allocInfo.classId].sub(
+  (*stats_.fragmentationSize)[tid][allocInfo.poolId][allocInfo.classId].sub(
       util::getFragmentation(*this, it));
 
   // Chained items can only end up in this place if the user has allocated
   // memory for a chained item but has decided not to insert the chained item
   // to a parent item and instead drop the chained item handle. In this case,
   // we free the chained item directly without calling remove callback.
-  if (it.isChainedItem()) {
+  //
+  // Except if we are moving a chained item between tiers -
+  // then it == toRecycle and we will want the normal recycle path
+  if (it.isChainedItem() && &it != toRecycle) {
     if (toRecycle) {
       throw std::runtime_error(
           folly::sformat("Can not recycle a chained item {}, toRecyle",
                          it.toString(), toRecycle->toString()));
     }
-
-    allocator_->free(&it);
+    allocator_[tid]->free(&it);
     return ReleaseRes::kReleased;
   }
 
@@ -3194,10 +3548,10 @@ CacheAllocator::releaseBackToAllocator(Item& it,
 
     while (head) {
       auto next = head->getNext(compressor_);
-
+      const auto tid = getTierId(head);
       const auto childInfo =
-          allocator_->getAllocInfo(static_cast(head));
-      (*stats_.fragmentationSize)[childInfo.poolId][childInfo.classId].sub(
+          allocator_[tid]->getAllocInfo(static_cast(head));
+      (*stats_.fragmentationSize)[tid][childInfo.poolId][childInfo.classId].sub(
           util::getFragmentation(*this, *head));
 
       removeFromMMContainer(*head);
@@ -3212,7 +3566,7 @@ CacheAllocator::releaseBackToAllocator(Item& it,
         XDCHECK(ReleaseRes::kReleased != res);
         res = ReleaseRes::kRecycled;
       } else {
-        allocator_->free(head);
+        allocator_[tid]->free(head);
       }
 
       stats_.numChainedChildItems.dec();
@@ -3226,7 +3580,7 @@ CacheAllocator::releaseBackToAllocator(Item& it,
     res = ReleaseRes::kRecycled;
   } else {
     XDCHECK(it.isDrained());
-    allocator_->free(&it);
+    allocator_[tid]->free(&it);
   }
 
   return res;
@@ -3515,14 +3869,9 @@ void CacheAllocator::wakeUpWaiters(folly::StringPiece key,
 }
 
 template 
-bool CacheAllocator::moveRegularItem(Item& oldItem,
-                                                 WriteHandle& newItemHdl) {
-  XDCHECK(oldItem.isMoving());
-  // If an item is expired, proceed to eviction.
-  if (oldItem.isExpired()) {
-    return false;
-  }
-
+bool CacheAllocator::moveRegularItem(
+    Item& oldItem, WriteHandle& newItemHdl, bool skipAddInMMContainer, bool fromBgThread) {
+  XDCHECK(!oldItem.isExpired());
   util::LatencyTracker tracker{stats_.moveRegularLatency_};
 
   XDCHECK_EQ(newItemHdl->getSize(), oldItem.getSize());
@@ -3534,20 +3883,32 @@ bool CacheAllocator::moveRegularItem(Item& oldItem,
     newItemHdl->markNvmClean();
   }
 
-  // Execute the move callback. We cannot make any guarantees about the
-  // consistency of the old item beyond this point, because the callback can
-  // do more than a simple memcpy() e.g. update external references. If there
-  // are any remaining handles to the old item, it is the caller's
-  // responsibility to invalidate them. The move can only fail after this
-  // statement if the old item has been removed or replaced, in which case it
-  // should be fine for it to be left in an inconsistent state.
-  config_.moveCb(oldItem, *newItemHdl, nullptr);
+  if (config_.moveCb) {
+    // Execute the move callback. We cannot make any guarantees about the
+    // consistency of the old item beyond this point, because the callback can
+    // do more than a simple memcpy() e.g. update external references. If there
+    // are any remaining handles to the old item, it is the caller's
+    // responsibility to invalidate them. The move can only fail after this
+    // statement if the old item has been removed or replaced, in which case it
+    // should be fine for it to be left in an inconsistent state.
+    config_.moveCb(oldItem, *newItemHdl, nullptr);
+  } else {
+    if (fromBgThread) {
+      std::memmove(newItemHdl->getMemory(), oldItem.getMemory(),
+                oldItem.getSize());
+    } else {
+      std::memcpy(newItemHdl->getMemory(), oldItem.getMemory(),
+                oldItem.getSize());
+    }
+  }
 
-  // Adding the item to mmContainer has to succeed since no one can remove the
-  // item
   auto& newContainer = getMMContainer(*newItemHdl);
-  auto mmContainerAdded = newContainer.add(*newItemHdl);
-  XDCHECK(mmContainerAdded);
+  if (!skipAddInMMContainer) {
+    // Adding the item to mmContainer has to succeed since no one can remove the
+    // item
+    auto mmContainerAdded = newContainer.add(*newItemHdl);
+    XDCHECK(mmContainerAdded);
+  }
 
   if (oldItem.hasChainedItem()) {
     XDCHECK(!newItemHdl->hasChainedItem()) << newItemHdl->toString();
@@ -3589,14 +3950,19 @@ bool CacheAllocator::moveChainedItem(ChainedItem& oldItem,
 
   auto parentPtr = &parentItem;
 
-  // Execute the move callback. We cannot make any guarantees about the
-  // consistency of the old item beyond this point, because the callback can
-  // do more than a simple memcpy() e.g. update external references. If there
-  // are any remaining handles to the old item, it is the caller's
-  // responsibility to invalidate them. The move can only fail after this
-  // statement if the old item has been removed or replaced, in which case it
-  // should be fine for it to be left in an inconsistent state.
-  config_.moveCb(oldItem, *newItemHdl, parentPtr);
+  if (config_.moveCb) {
+    // Execute the move callback. We cannot make any guarantees about the
+    // consistency of the old item beyond this point, because the callback can
+    // do more than a simple memcpy() e.g. update external references. If there
+    // are any remaining handles to the old item, it is the caller's
+    // responsibility to invalidate them. The move can only fail after this
+    // statement if the old item has been removed or replaced, in which case it
+    // should be fine for it to be left in an inconsistent state.
+    config_.moveCb(oldItem, *newItemHdl, parentPtr);
+  } else {
+    std::memcpy(newItemHdl->getMemory(), oldItem.getMemory(),
+                oldItem.getSize());
+  }
 
   // Replace the new item in the position of the old one before both in the
   // parent's chain and the MMContainer.
@@ -3631,23 +3997,496 @@ void CacheAllocator::unlinkItemForEviction(Item& it) {
   XDCHECK_EQ(0u, ref);
 }
 
+template 
+std::vector::Item*>
+CacheAllocator::findEvictionBatch(TierId tid,
+                                             PoolId pid,
+                                             ClassId cid,
+                                             unsigned int batch) {
+
+  std::vector-  toRecycles;
+  toRecycles.reserve(batch);
+  auto evictionData = getNextCandidates(tid,pid,cid,batch,true,false);
+  for (int i = 0; i < evictionData.size(); i++) {
+    Item *candidate = evictionData[i].candidate;
+    Item *toRecycle = evictionData[i].toRecycle;
+    toRecycles.push_back(toRecycle);
+    // recycle the item. it's safe to do so, even if toReleaseHandle was
+    // NULL. If `ref` == 0 then it means that we are the last holder of
+    // that item.
+    if (candidate->hasChainedItem()) {
+      (*stats_.chainedItemEvictions)[tid][pid][cid].inc();
+    } else {
+      (*stats_.regularItemEvictions)[tid][pid][cid].inc();
+    }
+
+    if (auto eventTracker = getEventTracker()) {
+      eventTracker->record(AllocatorApiEvent::DRAM_EVICT, candidate->getKey(),
+                           AllocatorApiResult::EVICTED, candidate->getSize(),
+                           candidate->getConfiguredTTL().count());
+    }
+
+    XDCHECK(!candidate->isChainedItem());
+    // check if by releasing the item we intend to, we actually
+    // recycle the candidate.
+    auto ret = releaseBackToAllocator(*candidate, RemoveContext::kEviction,
+                                      /* isNascent */ false, toRecycle);
+    XDCHECK_EQ(ret,ReleaseRes::kRecycled);
+  }
+  return toRecycles;
+}
+
+template 
+std::vector::Item*>
+CacheAllocator::getNextCandidatesPromotion(TierId tid,
+                                             PoolId pid,
+                                             ClassId cid,
+                                             unsigned int batch,
+                                             bool markMoving,
+                                             bool fromBgThread) {
+  std::vector-  newAllocs;
+  std::vector blankAllocs;
+  std::vector newHandles;
+  std::vector candidateHandles;
+  std::vector-  candidates;
+  candidates.reserve(batch);
+  candidateHandles.reserve(batch);
+  newAllocs.reserve(batch);
+  newHandles.reserve(batch);
+
+  auto& mmContainer = getMMContainer(tid, pid, cid);
+  unsigned int maxSearchTries = std::max(config_.evictionSearchTries,
+                                            batch*4);
+
+  // first try and get allocations in the next tier
+  blankAllocs = allocateInternalTierByCidBatch(tid-1,pid,cid,batch);
+  if (blankAllocs.empty()) {
+    return candidates;  
+  } else if (blankAllocs.size() < batch) {
+    batch = blankAllocs.size();
+  }
+  XDCHECK_EQ(blankAllocs.size(),batch);
+
+  auto iterateAndMark = [this, tid, pid, cid, batch,
+                         markMoving, maxSearchTries,
+                         &candidates, &candidateHandles,
+                         &mmContainer](auto&& itr) {
+
+    unsigned int searchTries = 0;
+    if (!itr) {
+      ++searchTries;
+      return;
+    }
+
+    while ((config_.evictionSearchTries == 0 ||
+            maxSearchTries > searchTries) &&
+           itr && candidates.size() < batch) {
+      ++searchTries;
+      auto* toRecycle_ = itr.get();
+      bool chainedItem_ = toRecycle_->isChainedItem();
+
+      if (chainedItem_) {
+          ++itr;
+          continue;
+      }
+      Item* candidate_;
+      WriteHandle candidateHandle_;
+      Item* syncItem_;
+      //sync on the parent item for chained items to move to next tier
+      candidate_ = toRecycle_;
+      syncItem_ = toRecycle_;
+      
+      bool marked = false;
+      if (markMoving) {
+        marked = syncItem_->markMoving();
+      } else if (!markMoving) {
+        //we use item handle as sync point - for background eviction
+        auto hdl = acquire(candidate_);
+        if (hdl && hdl->getRefCount() == 1) {
+          marked = true;
+          candidateHandle_ = std::move(hdl);
+        }
+      }
+      if (!marked) {
+        ++itr;
+        continue;
+      }
+      XDCHECK(!chainedItem_); 
+      mmContainer.remove(itr);
+      candidates.push_back(candidate_);
+      candidateHandles.push_back(std::move(candidateHandle_));
+    }
+  };
+  
+  mmContainer.withPromotionIterator(iterateAndMark);
+
+  if (candidates.size() < batch) {
+    unsigned int toErase = batch - candidates.size();
+    for (int i = 0; i < toErase; i++) {
+      allocator_[tid-1]->free(blankAllocs.back());
+      blankAllocs.pop_back();
+    }
+    if (candidates.size() == 0) {
+      return candidates;  
+    }
+  }
+  
+  //1. get and item handle from a new allocation
+  for (int i = 0; i < candidates.size(); i++) {
+    Item *candidate = candidates[i];
+    WriteHandle newItemHdl = acquire(new (blankAllocs[i]) 
+            Item(candidate->getKey(), candidate->getSize(),
+                 candidate->getCreationTime(), candidate->getExpiryTime()));
+    XDCHECK(newItemHdl);
+    if (newItemHdl) {
+      newItemHdl.markNascent();
+      (*stats_.fragmentationSize)[tid][pid][cid].add(
+          util::getFragmentation(*this, *newItemHdl));
+      newAllocs.push_back(newItemHdl.getInternal());
+      newHandles.push_back(std::move(newItemHdl));
+    } else {
+      //failed to get item handle
+      throw std::runtime_error(
+         folly::sformat("Was not to acquire new alloc, failed alloc {}", blankAllocs[i]));
+    }
+  }
+  //2. add in batch to mmContainer
+  auto& newMMContainer = getMMContainer(tid-1, pid, cid);
+  uint32_t added = newMMContainer.addBatch(newAllocs.begin(), newAllocs.end());
+  XDCHECK_EQ(added,newAllocs.size());
+  if (added != newAllocs.size()) {
+    throw std::runtime_error(
+      folly::sformat("Was not able to add all new items, failed item {} and handle {}", 
+                      newAllocs[added]->toString(),newHandles[added]->toString()));
+  }
+  //3. copy item data - don't need to add in mmContainer
+  for (int i = 0; i < candidates.size(); i++) {
+    Item *candidate = candidates[i];
+    WriteHandle newHandle = std::move(newHandles[i]);
+    bool moved = moveRegularItem(*candidate,newHandle, true, true);
+    if (moved) {
+      XDCHECK(candidate->getKey() == newHandle->getKey());
+      if (markMoving) {
+        auto ref = candidate->unmarkMoving();
+        XDCHECK_EQ(ref,0);
+        wakeUpWaiters(candidate->getKey(), std::move(newHandle));
+        const auto res =
+            releaseBackToAllocator(*candidate, RemoveContext::kNormal, false);
+        XDCHECK(res == ReleaseRes::kReleased);
+      }
+    } else {
+      typename NvmCacheT::PutToken token{};
+      
+      removeFromMMContainer(*newAllocs[i]);
+      auto ret = handleFailedMove(candidate,token,false,markMoving);
+      XDCHECK(ret);
+      if (markMoving && candidate->getRefCountAndFlagsRaw() == 0) {
+        const auto res =
+            releaseBackToAllocator(*candidate, RemoveContext::kNormal, false);
+        XDCHECK(res == ReleaseRes::kReleased);
+      }
+
+    }
+  }
+  return candidates;
+}
+
+template 
+std::vector::EvictionData>
+CacheAllocator::getNextCandidates(TierId tid,
+                                             PoolId pid,
+                                             ClassId cid,
+                                             unsigned int batch,
+                                             bool markMoving,
+                                             bool fromBgThread) {
+
+  std::vector blankAllocs;
+  std::vector-  newAllocs;
+  std::vector newHandles;
+  std::vector evictionData;
+  evictionData.reserve(batch);
+  newAllocs.reserve(batch);
+  newHandles.reserve(batch);
+  
+  auto& mmContainer = getMMContainer(tid, pid, cid);
+  bool lastTier = tid+1 >= getNumTiers();
+  unsigned int maxSearchTries = std::max(config_.evictionSearchTries,
+                                            batch*4);
+  if (!lastTier) {
+    blankAllocs = allocateInternalTierByCidBatch(tid+1,pid,cid,batch);
+    if (blankAllocs.empty()) {
+      return evictionData;  
+    } else if (blankAllocs.size() != batch) {
+      batch = blankAllocs.size(); 
+    }
+    XDCHECK_EQ(blankAllocs.size(),batch);
+  }
+
+  auto iterateAndMark = [this, tid, pid, cid, batch,
+                         markMoving, lastTier, maxSearchTries,
+                         &evictionData, &mmContainer](auto&& itr) {
+    unsigned int searchTries = 0;
+    if (!itr) {
+      ++searchTries;
+      (*stats_.evictionAttempts)[tid][pid][cid].inc();
+      return;
+    }
+
+    while ((config_.evictionSearchTries == 0 ||
+            maxSearchTries > searchTries) &&
+           itr && evictionData.size() < batch) {
+      ++searchTries;
+      (*stats_.evictionAttempts)[tid][pid][cid].inc();
+
+      auto* toRecycle_ = itr.get();
+      bool chainedItem_ = toRecycle_->isChainedItem();
+      Item* toRecycleParent_ = chainedItem_
+              ? &toRecycle_->asChainedItem().getParentItem(compressor_)
+              : nullptr;
+      if (toRecycle_->isExpired()) {
+          ++itr;
+          continue;
+      }
+      // in order to safely check if the expected parent (toRecycleParent_) matches
+      // the current parent on the chained item, we need to take the chained
+      // item lock so we are sure that nobody else will be editing the chain
+      auto l_ = chainedItem_
+                ? chainedItemLocks_.tryLockExclusive(toRecycleParent_->getKey())
+                : decltype(chainedItemLocks_.tryLockExclusive(toRecycle_->getKey()))();
+
+      if (chainedItem_ &&
+          ( !l_ || &toRecycle_->asChainedItem().getParentItem(compressor_)
+                    != toRecycleParent_) ) {
+          ++itr;
+          continue;
+      }
+      Item* candidate_;
+      WriteHandle candidateHandle_;
+      Item* syncItem_;
+      //sync on the parent item for chained items to move to next tier
+      if (!lastTier && chainedItem_) {
+          syncItem_ = toRecycleParent_;
+          candidate_ = toRecycle_;
+      } else if (lastTier && chainedItem_) {
+          candidate_ = toRecycleParent_;
+          syncItem_ = toRecycleParent_;
+      } else {
+          candidate_ = toRecycle_;
+          syncItem_ = toRecycle_;
+      }
+      // if it's last tier, the item will be evicted
+      // need to create put token before marking it exclusive
+      const bool evictToNvmCache = lastTier && shouldWriteToNvmCache(*candidate_);
+
+      auto token_ = evictToNvmCache
+                        ? nvmCache_->createPutToken(candidate_->getKey())
+                        : typename NvmCacheT::PutToken{};
+      
+      if (evictToNvmCache && !token_.isValid()) {
+        stats_.evictFailConcurrentFill.inc();
+        ++itr;
+        continue;
+      }
+      bool marked = false;
+      //case 1: mark the item for eviction
+      if ((lastTier || candidate_->isExpired()) && markMoving) {
+        marked = syncItem_->markForEviction();
+      } else if (markMoving) {
+        marked = syncItem_->markMoving();
+      } else if (!markMoving) {
+        //we use item handle as sync point - for background eviction
+        auto hdl = acquire(candidate_);
+        if (hdl && hdl->getRefCount() == 1) {
+          marked = true;
+          candidateHandle_ = std::move(hdl);
+        }
+      }
+      if (!marked) {
+        if (candidate_->hasChainedItem()) {
+          stats_.evictFailParentAC.inc();
+        } else {
+          stats_.evictFailAC.inc();
+        }
+        ++itr;
+        continue;
+      }
+      
+      if (chainedItem_) {
+          XDCHECK(l_);
+          XDCHECK_EQ(toRecycleParent_,&toRecycle_->asChainedItem().getParentItem(compressor_));
+      }
+      mmContainer.remove(itr);
+      EvictionData ed(candidate_,toRecycle_,toRecycleParent_,chainedItem_,
+                      candidate_->isExpired(), std::move(token_), std::move(candidateHandle_));
+      evictionData.push_back(std::move(ed));
+    }
+  };
+  
+  mmContainer.withEvictionIterator(iterateAndMark);
+
+  if (evictionData.size() < batch) {
+    if (!lastTier) {
+      unsigned int toErase = batch - evictionData.size();
+      for (int i = 0; i < toErase; i++) {
+        allocator_[tid+1]->free(blankAllocs.back());
+        blankAllocs.pop_back();
+      }
+    }
+    if (evictionData.size() == 0) {
+      return evictionData;  
+    }
+  }
+  
+  if (!lastTier) {
+    //1. get and item handle from a new allocation
+    for (int i = 0; i < evictionData.size(); i++) {
+      Item *candidate = evictionData[i].candidate;
+      WriteHandle newItemHdl = acquire(new (blankAllocs[i]) 
+              Item(candidate->getKey(), candidate->getSize(),
+                   candidate->getCreationTime(), candidate->getExpiryTime()));
+      XDCHECK(newItemHdl);
+      if (newItemHdl) {
+        newItemHdl.markNascent();
+        (*stats_.fragmentationSize)[tid][pid][cid].add(
+            util::getFragmentation(*this, *newItemHdl));
+        newAllocs.push_back(newItemHdl.getInternal());
+        newHandles.push_back(std::move(newItemHdl));
+      } else {
+        //failed to get item handle
+        throw std::runtime_error(
+           folly::sformat("Was not to acquire new alloc, failed alloc {}", blankAllocs[i]));
+      }
+    }
+    //2. add in batch to mmContainer
+    auto& newMMContainer = getMMContainer(tid+1, pid, cid);
+    uint32_t added = newMMContainer.addBatch(newAllocs.begin(), newAllocs.end());
+    XDCHECK_EQ(added,newAllocs.size());
+    if (added != newAllocs.size()) {
+      throw std::runtime_error(
+        folly::sformat("Was not able to add all new items, failed item {} and handle {}", 
+                        newAllocs[added]->toString(),newHandles[added]->toString()));
+    }
+    //3. copy item data - don't need to add in mmContainer
+    for (int i = 0; i < evictionData.size(); i++) {
+      Item *candidate = evictionData[i].candidate;
+      WriteHandle newHandle = std::move(newHandles[i]);
+      bool moved = moveRegularItem(*candidate,newHandle, true, true);
+      if (moved) {
+        (*stats_.numWritebacks)[tid][pid][cid].inc();
+        XDCHECK(candidate->getKey() == newHandle->getKey());
+        if (markMoving) {
+          auto ref = candidate->unmarkMoving();
+          XDCHECK_EQ(ref,0);
+          wakeUpWaiters(candidate->getKey(), std::move(newHandle));
+          if (fromBgThread) {
+            const auto res =
+                releaseBackToAllocator(*candidate, RemoveContext::kNormal, false);
+            XDCHECK(res == ReleaseRes::kReleased);
+          }
+        }
+      } else {
+        typename NvmCacheT::PutToken token = std::move(evictionData[i].token);
+        removeFromMMContainer(*newAllocs[i]);
+        auto ret = handleFailedMove(candidate,token,evictionData[i].expired,markMoving);
+        XDCHECK(ret);
+        if (fromBgThread && markMoving) {
+          const auto res =
+              releaseBackToAllocator(*candidate, RemoveContext::kNormal, false);
+          XDCHECK(res == ReleaseRes::kReleased);
+        }
+
+      }
+    }
+  } else {
+    //we are the last tier - just remove
+    for (int i = 0; i < evictionData.size(); i++) {
+      Item *candidate = evictionData[i].candidate;
+      typename NvmCacheT::PutToken token = std::move(evictionData[i].token);
+      auto ret = handleFailedMove(candidate,token,evictionData[i].expired,markMoving);
+      if (fromBgThread && markMoving) {
+        const auto res =
+            releaseBackToAllocator(*candidate, RemoveContext::kNormal, false);
+        XDCHECK(res == ReleaseRes::kReleased);
+      }
+    }
+  }
+
+  return evictionData;
+}
+
+// 
+// Common function in case move among tiers fails during eviction
+//
+// if insertOrReplace was called during move
+// then candidate will not be accessible (failed replace during tryEvict)
+//  - therefore this was why we failed to
+//    evict to the next tier and insertOrReplace
+//    will remove from NVM cache
+// however, if candidate is accessible
+// that means the allocation in the next
+// tier failed - so we will continue to
+// evict the item to NVM cache
+template 
+bool CacheAllocator::handleFailedMove(Item* candidate, 
+                                                  typename NvmCacheT::PutToken& token, 
+                                                  bool isExpired,
+                                                  bool markMoving) {
+  bool failedToReplace = !candidate->isAccessible();
+  if (!token.isValid() && !failedToReplace) {
+    token = createPutToken(*candidate);
+  }
+  // in case that we are on the last tier, we whould have already marked
+  // as exclusive since we will not be moving the item to the next tier
+  // but rather just evicting all together, no need to
+  // markForEvictionWhenMoving
+  if (markMoving) {
+    if (!candidate->isMarkedForEviction() &&
+        candidate->isMoving()) {
+      auto ret = (isExpired) ? true : candidate->markForEvictionWhenMoving();
+      XDCHECK(ret);
+    }
+    unlinkItemForEviction(*candidate);
+  } else if (candidate->isAccessible()) {
+    accessContainer_->remove(*candidate);
+  }
+ 
+  if (token.isValid() && shouldWriteToNvmCacheExclusive(*candidate)
+          && !failedToReplace) {
+    nvmCache_->put(*candidate, std::move(token));
+  }
+  // wake up any readers that wait for the move to complete
+  // it's safe to do now, as we have the item marked exclusive and
+  // no other reader can be added to the waiters list
+  if (markMoving) {
+    wakeUpWaiters(candidate->getKey(), {});
+  }
+  return true;
+}
+
 template 
 std::pair::Item*,
           typename CacheAllocator::Item*>
-CacheAllocator::getNextCandidate(PoolId pid,
+CacheAllocator::getNextCandidate(TierId tid,
+                                             PoolId pid,
                                              ClassId cid,
                                              unsigned int& searchTries) {
   typename NvmCacheT::PutToken token;
   Item* toRecycle = nullptr;
+  Item* toRecycleParent = nullptr;
   Item* candidate = nullptr;
-  auto& mmContainer = getMMContainer(pid, cid);
-
-  mmContainer.withEvictionIterator([this, pid, cid, &candidate, &toRecycle,
-                                    &searchTries, &mmContainer,
-                                    &token](auto&& itr) {
+  bool isExpired = false;
+  bool chainedItem = false;
+  auto& mmContainer = getMMContainer(tid, pid, cid);
+  bool lastTier = tid+1 >= getNumTiers() || config_.noOnlineEviction;
+
+  mmContainer.withEvictionIterator([this, tid, pid, cid, &candidate,
+                                    &toRecycle, &toRecycleParent,
+                                    &chainedItem,
+                                    &searchTries, &mmContainer, &lastTier,
+                                    &isExpired, &token](auto&& itr) {
     if (!itr) {
       ++searchTries;
-      (*stats_.evictionAttempts)[pid][cid].inc();
+      (*stats_.evictionAttempts)[tid][pid][cid].inc();
       return;
     }
 
@@ -3655,26 +4494,57 @@ CacheAllocator::getNextCandidate(PoolId pid,
             config_.evictionSearchTries > searchTries) &&
            itr) {
       ++searchTries;
-      (*stats_.evictionAttempts)[pid][cid].inc();
+      (*stats_.evictionAttempts)[tid][pid][cid].inc();
 
       auto* toRecycle_ = itr.get();
-      auto* candidate_ =
-          toRecycle_->isChainedItem()
+      bool chainedItem_ = toRecycle_->isChainedItem();
+      Item* toRecycleParent_ = chainedItem_
               ? &toRecycle_->asChainedItem().getParentItem(compressor_)
-              : toRecycle_;
-
-      const bool evictToNvmCache = shouldWriteToNvmCache(*candidate_);
-      auto putToken = evictToNvmCache
-                          ? nvmCache_->createPutToken(candidate_->getKey())
-                          : typename NvmCacheT::PutToken{};
-
-      if (evictToNvmCache && !putToken.isValid()) {
+              : nullptr;
+      // in order to safely check if the expected parent (toRecycleParent_) matches
+      // the current parent on the chained item, we need to take the chained
+      // item lock so we are sure that nobody else will be editing the chain
+      auto l_ = chainedItem_
+                ? chainedItemLocks_.tryLockExclusive(toRecycleParent_->getKey())
+                : decltype(chainedItemLocks_.tryLockExclusive(toRecycle_->getKey()))();
+
+      if (chainedItem_ &&
+          ( !l_ || &toRecycle_->asChainedItem().getParentItem(compressor_)
+                    != toRecycleParent_) ) {
+          // Fail moving if we either couldn't acquire the chained item lock,
+          // or if the parent had already been replaced in the meanwhile.
+          ++itr;
+          continue;
+      }
+      Item* candidate_;
+      Item* syncItem_;
+      //sync on the parent item for chained items to move to next tier
+      if (!lastTier && chainedItem_) {
+          syncItem_ = toRecycleParent_;
+          candidate_ = toRecycle_;
+      } else if (lastTier && chainedItem_) {
+          candidate_ = toRecycleParent_;
+          syncItem_ = toRecycleParent_;
+      } else {
+          candidate_ = toRecycle_;
+          syncItem_ = toRecycle_;
+      }
+      // if it's last tier, the item will be evicted
+      // need to create put token before marking it exclusive
+      const bool evictToNvmCache = lastTier && shouldWriteToNvmCache(*candidate_);
+      auto token_ = evictToNvmCache
+                        ? nvmCache_->createPutToken(candidate_->getKey())
+                        : typename NvmCacheT::PutToken{};
+
+      if (evictToNvmCache && !token_.isValid()) {
         stats_.evictFailConcurrentFill.inc();
         ++itr;
         continue;
       }
 
-      auto markedForEviction = candidate_->markForEviction();
+      auto markedForEviction = (lastTier || candidate_->isExpired()) ?
+                                   syncItem_->markForEviction() :
+                                   syncItem_->markMoving();
       if (!markedForEviction) {
         if (candidate_->hasChainedItem()) {
           stats_.evictFailParentAC.inc();
@@ -3685,20 +4555,21 @@ CacheAllocator::getNextCandidate(PoolId pid,
         continue;
       }
 
+      XDCHECK(syncItem_->isMoving() || syncItem_->isMarkedForEviction());
+      toRecycleParent = toRecycleParent_;
+      chainedItem = chainedItem_;
       // markForEviction to make sure no other thead is evicting the item
-      // nor holding a handle to that item
+      // nor holding a handle to that item if this is last tier
+      // since we won't be moving the item to the next tier
       toRecycle = toRecycle_;
       candidate = candidate_;
-      token = std::move(putToken);
-
-      // Check if parent changed for chained items - if yes, we cannot
-      // remove the child from the mmContainer as we will not be evicting
-      // it. We could abort right here, but we need to cleanup in case
-      // unmarkForEviction() returns 0 - so just go through normal path.
-      if (!toRecycle_->isChainedItem() ||
-          &toRecycle->asChainedItem().getParentItem(compressor_) == candidate) {
-        mmContainer.remove(itr);
+      isExpired = candidate_->isExpired();
+      token = std::move(token_);
+      if (chainedItem) {
+          XDCHECK(l_);
+          XDCHECK_EQ(toRecycleParent,&toRecycle_->asChainedItem().getParentItem(compressor_));
       }
+      mmContainer.remove(itr);
       return;
     }
   });
@@ -3709,25 +4580,72 @@ CacheAllocator::getNextCandidate(PoolId pid,
 
   XDCHECK(toRecycle);
   XDCHECK(candidate);
-  XDCHECK(candidate->isMarkedForEviction());
 
-  unlinkItemForEviction(*candidate);
-
-  if (token.isValid() && shouldWriteToNvmCacheExclusive(*candidate)) {
-    nvmCache_->put(*candidate, std::move(token));
+  auto evictedToNext = (lastTier || isExpired) ? nullptr
+      : tryEvictToNextMemoryTier(*candidate);
+  if (!evictedToNext) {
+    //failed to move a chained item - so evict the entire chain
+    if (candidate->isChainedItem()) {
+      //candidate should be parent now
+      XDCHECK(toRecycleParent->isMoving());
+      XDCHECK_EQ(candidate,toRecycle);
+      candidate = toRecycleParent; //but now we evict the chain and in
+                                    //doing so recycle the child
+    }
+    //clean up and evict the candidate since we failed
+    auto ret = handleFailedMove(candidate,token,isExpired,true);
+    XDCHECK(ret);
+  } else {
+    XDCHECK(!evictedToNext->isMarkedForEviction() && !evictedToNext->isMoving());
+    XDCHECK(!candidate->isMarkedForEviction() && !candidate->isMoving());
+    XDCHECK(!candidate->isAccessible());
+    XDCHECK(candidate->getKey() == evictedToNext->getKey());
+
+    (*stats_.numWritebacks)[tid][pid][cid].inc();
+    if (chainedItem) {
+      XDCHECK(toRecycleParent->isMoving());
+      XDCHECK_EQ(evictedToNext->getRefCount(),2u);
+      (*stats_.chainedItemEvictions)[tid][pid][cid].inc();
+      // check if by releasing the item we intend to, we actually
+      // recycle the candidate.
+      auto ret = releaseBackToAllocator(*candidate, RemoveContext::kEviction,
+                                        /* isNascent */ false, toRecycle);
+      XDCHECK_EQ(ret,ReleaseRes::kRecycled);
+      evictedToNext.reset(); //once we unmark moving threads will try and alloc, drop
+                              //the handle now - and refcount will drop to 1
+      auto ref = toRecycleParent->unmarkMoving();
+      if (UNLIKELY(ref == 0)) {
+        wakeUpWaiters(toRecycleParent->getKey(),{});
+        const auto res =
+            releaseBackToAllocator(*toRecycleParent, RemoveContext::kNormal, false);
+        XDCHECK(res == ReleaseRes::kReleased);
+      } else {
+        auto parentHandle = acquire(toRecycleParent);
+        if (parentHandle) {
+          wakeUpWaiters(toRecycleParent->getKey(),std::move(parentHandle));
+        } //in case where parent handle is null that means some other thread
+          // would have called wakeUpWaiters with null handle and released
+          // parent back to allocator
+      }
+    } else {
+      wakeUpWaiters(candidate->getKey(), std::move(evictedToNext));
+    }
   }
+
+  XDCHECK(!candidate->isMarkedForEviction() && !candidate->isMoving());
+
   return {candidate, toRecycle};
 }
 
 template 
 typename CacheAllocator::Item*
-CacheAllocator::findEviction(PoolId pid, ClassId cid) {
+CacheAllocator::findEviction(TierId tid, PoolId pid, ClassId cid) {
   // Keep searching for a candidate until we were able to evict it
   // or until the search limit has been exhausted
   unsigned int searchTries = 0;
   while (config_.evictionSearchTries == 0 ||
          config_.evictionSearchTries > searchTries) {
-    auto [candidate, toRecycle] = getNextCandidate(pid, cid, searchTries);
+    auto [candidate, toRecycle] = getNextCandidate(tid, pid, cid, searchTries);
 
     // Reached the end of the eviction queue but doulen't find a candidate,
     // start again.
@@ -3738,9 +4656,9 @@ CacheAllocator::findEviction(PoolId pid, ClassId cid) {
     // NULL. If `ref` == 0 then it means that we are the last holder of
     // that item.
     if (candidate->hasChainedItem()) {
-      (*stats_.chainedItemEvictions)[pid][cid].inc();
+      (*stats_.chainedItemEvictions)[tid][pid][cid].inc();
     } else {
-      (*stats_.regularItemEvictions)[pid][cid].inc();
+      (*stats_.regularItemEvictions)[tid][pid][cid].inc();
     }
 
     if (auto eventTracker = getEventTracker()) {
@@ -3808,6 +4726,70 @@ bool CacheAllocator::shouldWriteToNvmCacheExclusive(
   return true;
 }
 
+template 
+typename CacheAllocator::WriteHandle
+CacheAllocator::tryEvictToNextMemoryTier(
+    TierId tid, PoolId pid, Item& item) {
+
+  TierId nextTier = tid; // TODO - calculate this based on some admission policy
+  while (++nextTier < getNumTiers()) { // try to evict down to the next memory tiers
+    // always evict item from the nextTier to make room for new item
+    bool evict = true;
+    // allocateInternal might trigger another eviction
+    WriteHandle newItemHdl{};
+    Item* parentItem;
+    bool chainedItem = false;
+    if(item.isChainedItem()) {
+        chainedItem = true;
+        parentItem = &item.asChainedItem().getParentItem(compressor_);
+        XDCHECK(parentItem->isMoving());
+        XDCHECK(item.isChainedItem() && item.getRefCount() == 1);
+        XDCHECK_EQ(0, parentItem->getRefCount());
+        newItemHdl = allocateChainedItemInternalTier(*parentItem,
+                                                     item.getSize(),
+                                                     nextTier);
+    } else {
+      // this assert can fail if parent changed
+      XDCHECK(item.isMoving());
+      XDCHECK(item.getRefCount() == 0);
+      newItemHdl = allocateInternalTier(nextTier, pid,
+                     item.getKey(),
+                     item.getSize(),
+                     item.getCreationTime(),
+                     item.getExpiryTime(),
+                     evict);
+    }
+
+    if (newItemHdl) {
+      bool moveSuccess = chainedItem
+                      ? moveChainedItem(item.asChainedItem(), newItemHdl)
+                      : moveRegularItem(item, newItemHdl,
+                      /* skipAddInMMContainer */ false, /* fromBgThread*/ false);
+      if (!moveSuccess) {
+        return WriteHandle{};
+      }
+      XDCHECK_EQ(newItemHdl->getSize(), item.getSize());
+      if (!chainedItem) { // TODO: do we need it?
+        XDCHECK_EQ(newItemHdl->getKey(),item.getKey());
+        item.unmarkMoving();
+      }
+      return newItemHdl;
+    } else {
+      return WriteHandle{};
+    }
+  }
+
+  return {};
+}
+
+template 
+typename CacheAllocator::WriteHandle
+CacheAllocator::tryEvictToNextMemoryTier(Item& item) {
+  auto tid = getTierId(item);
+  auto pid = allocator_[tid]->getAllocInfo(item.getMemory()).poolId;
+  return tryEvictToNextMemoryTier(tid, pid, item);
+}
+
 template 
 typename CacheAllocator::RemoveRes
 CacheAllocator::remove(typename Item::Key key) {
@@ -4008,21 +4990,57 @@ void CacheAllocator::invalidateNvm(Item& item) {
   }
 }
 
+template 
+TierId
+CacheAllocator::getTierId(const Item& item) const {
+  return getTierId(item.getMemory());
+}
+
+template 
+TierId
+CacheAllocator::getTierId(const void* ptr) const {
+  for (TierId tid = 0; tid < getNumTiers(); tid++) {
+    if (allocator_[tid]->isMemoryInAllocator(ptr))
+      return tid;
+  }
+
+  throw std::invalid_argument("Item does not belong to any tier!");
+}
+
 template 
 typename CacheAllocator::MMContainer&
 CacheAllocator::getMMContainer(const Item& item) const noexcept {
+  const auto tid = getTierId(item);
   const auto allocInfo =
-      allocator_->getAllocInfo(static_cast(&item));
-  return getMMContainer(allocInfo.poolId, allocInfo.classId);
+      allocator_[tid]->getAllocInfo(static_cast(&item));
+  return getMMContainer(tid, allocInfo.poolId, allocInfo.classId);
 }
 
 template 
 typename CacheAllocator::MMContainer&
-CacheAllocator::getMMContainer(PoolId pid,
+CacheAllocator::getMMContainer(TierId tid,
+                                           PoolId pid,
                                            ClassId cid) const noexcept {
-  XDCHECK_LT(static_cast(pid), mmContainers_.size());
-  XDCHECK_LT(static_cast(cid), mmContainers_[pid].size());
-  return *mmContainers_[pid][cid];
+  XDCHECK_LT(static_cast(tid), mmContainers_.size());
+  XDCHECK_LT(static_cast(pid), mmContainers_[tid].size());
+  XDCHECK_LT(static_cast(cid), mmContainers_[tid][pid].size());
+  return *mmContainers_[tid][pid][cid];
+}
+
+template 
+MMContainerStat CacheAllocator::getMMContainerStat(
+    TierId tid, PoolId pid, ClassId cid) const noexcept {
+  if(static_cast(tid) >= mmContainers_.size()) {
+    return MMContainerStat{};
+  }
+  if (static_cast(pid) >= mmContainers_[tid].size()) {
+    return MMContainerStat{};
+  }
+  if (static_cast