Skip to content

Commit f205398

Browse files
committed
Implemented async Item movement between tiers
1 parent 341a94b commit f205398

File tree

7 files changed

+245
-21
lines changed

7 files changed

+245
-21
lines changed

cachelib/allocator/CacheAllocator-inl.h

Lines changed: 90 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,12 +1101,42 @@ CacheAllocator<CacheTrait>::insertOrReplace(const ItemHandle& handle) {
11011101
return replaced;
11021102
}
11031103

1104+
/* Next two methods are used to asynchronously move Item between memory tiers.
1105+
*
1106+
* The thread, which moves Item, allocates new Item in the tier we are moving to
1107+
* and calls moveRegularItemOnEviction() method. This method does the following:
1108+
* 1. Create MoveCtx and put it to the movesMap.
1109+
* 2. Update the access container with the new item from the tier we are
1110+
* moving to. This Item has kIncomplete flag set.
1111+
* 3. Copy data from the old Item to the new one.
1112+
* 4. Unset the kIncomplete flag and Notify MoveCtx
1113+
*
1114+
* Concurrent threads which are getting handle to the same key:
1115+
* 1. When a handle is created it checks if the kIncomplete flag is set
1116+
* 2. If so, Handle implementation creates waitContext and adds it to the
1117+
* MoveCtx by calling addWaitContextForMovingItem() method.
1118+
* 3. Wait until the moving thread will complete its job.
1119+
*/
1120+
template <typename CacheTrait>
1121+
bool CacheAllocator<CacheTrait>::addWaitContextForMovingItem(
1122+
folly::StringPiece key, std::shared_ptr<WaitContext<ItemHandle>> waiter) {
1123+
auto shard = getShardForKey(key);
1124+
auto& movesMap = getMoveMapForShard(shard);
1125+
auto lock = getMoveLockForShard(shard);
1126+
auto it = movesMap.find(key);
1127+
if (it == movesMap.end()) {
1128+
return false;
1129+
}
1130+
auto ctx = it->second.get();
1131+
ctx->addWaiter(std::move(waiter));
1132+
return true;
1133+
}
1134+
11041135
template <typename CacheTrait>
11051136
bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
1106-
Item& oldItem,
1107-
ItemHandle& newItemHdl) {
1108-
// TODO: should we introduce new latency tracker. E.g. evictRegularLatency_ ???
1109-
// util::LatencyTracker tracker{stats_.evictRegularLatency_};
1137+
Item& oldItem, ItemHandle& newItemHdl) {
1138+
// TODO: should we introduce new latency tracker. E.g. evictRegularLatency_
1139+
// ??? util::LatencyTracker tracker{stats_.evictRegularLatency_};
11101140

11111141
if (!oldItem.isAccessible() || oldItem.isExpired()) {
11121142
return false;
@@ -1122,22 +1152,36 @@ bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
11221152
newItemHdl->markNvmClean();
11231153
}
11241154

1125-
if(config_.moveCb) {
1126-
// Execute the move callback. We cannot make any guarantees about the
1127-
// consistency of the old item beyond this point, because the callback can
1128-
// do more than a simple memcpy() e.g. update external references. If there
1129-
// are any remaining handles to the old item, it is the caller's
1130-
// responsibility to invalidate them. The move can only fail after this
1131-
// statement if the old item has been removed or replaced, in which case it
1132-
// should be fine for it to be left in an inconsistent state.
1133-
config_.moveCb(oldItem, *newItemHdl, nullptr);
1134-
} else {
1135-
std::memcpy(newItemHdl->getWritableMemory(), oldItem.getMemory(), oldItem.getSize());
1155+
folly::StringPiece key(oldItem.getKey());
1156+
auto shard = getShardForKey(key);
1157+
auto& movesMap = getMoveMapForShard(shard);
1158+
MoveCtx* ctx(nullptr);
1159+
{
1160+
auto lock = getMoveLockForShard(shard);
1161+
auto res = movesMap.try_emplace(key, std::make_unique<MoveCtx>());
1162+
if (!res.second) {
1163+
return false;
1164+
}
1165+
ctx = res.first->second.get();
11361166
}
11371167

1138-
// TODO: Possible data race. We copied Item's memory to the newItemHdl
1139-
// but have not updated accessContainer yet. Concurrent threads might get handle
1140-
// to the old Item.
1168+
auto resHdl = ItemHandle{};
1169+
auto guard = folly::makeGuard([key, this, ctx, shard, &resHdl]() {
1170+
auto& movesMap = getMoveMapForShard(shard);
1171+
resHdl->unmarkIncomplete();
1172+
auto lock = getMoveLockForShard(shard);
1173+
ctx->setItemHandle(std::move(resHdl));
1174+
movesMap.erase(key);
1175+
});
1176+
1177+
// TODO: Possibly we can use markMoving() instead. But today
1178+
// moveOnSlabRelease logic assume that we mark as moving old Item
1179+
// and than do copy and replace old Item with the new one in access
1180+
// container. Furthermore, Item can be marked as Moving only
1181+
// if it is linked to MM container. In our case we mark the new Item
1182+
// and update access container before the new Item is ready (content is
1183+
// copied).
1184+
newItemHdl->markIncomplete();
11411185

11421186
// Inside the access container's lock, this checks if the old item is
11431187
// accessible and its refcount is zero. If the item is not accessible,
@@ -1147,10 +1191,25 @@ bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
11471191
// this item through an API such as remove(ItemHandle). In this case,
11481192
// it is unsafe to replace the old item with a new one, so we should
11491193
// also abort.
1150-
if (!accessContainer_->replaceIf(oldItem, *newItemHdl, itemEvictionPredicate)) {
1194+
if (!accessContainer_->replaceIf(oldItem, *newItemHdl,
1195+
itemEvictionPredicate)) {
11511196
return false;
11521197
}
11531198

1199+
if (config_.moveCb) {
1200+
// Execute the move callback. We cannot make any guarantees about the
1201+
// consistency of the old item beyond this point, because the callback can
1202+
// do more than a simple memcpy() e.g. update external references. If there
1203+
// are any remaining handles to the old item, it is the caller's
1204+
// responsibility to invalidate them. The move can only fail after this
1205+
// statement if the old item has been removed or replaced, in which case it
1206+
// should be fine for it to be left in an inconsistent state.
1207+
config_.moveCb(oldItem, *newItemHdl, nullptr);
1208+
} else {
1209+
std::memcpy(newItemHdl->getWritableMemory(), oldItem.getMemory(),
1210+
oldItem.getSize());
1211+
}
1212+
11541213
// Inside the MM container's lock, this checks if the old item exists to
11551214
// make sure that no other thread removed it, and only then replaces it.
11561215
if (!replaceInMMContainer(oldItem, *newItemHdl)) {
@@ -1185,6 +1244,7 @@ bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
11851244
XDCHECK(newItemHdl->hasChainedItem());
11861245
}
11871246
newItemHdl.unmarkNascent();
1247+
resHdl = newItemHdl.clone(); // guard will assign it to ctx under lock
11881248
return true;
11891249
}
11901250

@@ -1463,13 +1523,20 @@ CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(
14631523
if (moveRegularItemOnEviction(item, newItemHdl)) {
14641524
return acquire(&item);
14651525
}
1466-
// TODO: should we free the newItemHdl if moveRegularItemOnEviction returns false???
14671526
}
14681527
}
14691528

14701529
return {};
14711530
}
14721531

1532+
template <typename CacheTrait>
1533+
typename CacheAllocator<CacheTrait>::ItemHandle
1534+
CacheAllocator<CacheTrait>::tryEvictToNextMemoryTier(Item& item) {
1535+
auto tid = getTierId(item);
1536+
auto pid = allocator_[tid]->getAllocInfo(item.getMemory()).poolId;
1537+
return tryEvictToNextMemoryTier(tid, pid, item);
1538+
}
1539+
14731540
template <typename CacheTrait>
14741541
typename CacheAllocator<CacheTrait>::ItemHandle
14751542
CacheAllocator<CacheTrait>::advanceIteratorAndTryEvictRegularItem(
@@ -2830,6 +2897,9 @@ CacheAllocator<CacheTrait>::evictNormalItemForSlabRelease(Item& item) {
28302897
return ItemHandle{};
28312898
}
28322899

2900+
auto evictHandle = tryEvictToNextMemoryTier(item);
2901+
if(evictHandle) return evictHandle;
2902+
28332903
auto predicate = [](const Item& it) { return it.getRefCount() == 0; };
28342904

28352905
const bool evictToNvmCache = shouldWriteToNvmCache(item);

cachelib/allocator/CacheAllocator.h

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <folly/ScopeGuard.h>
2222
#include <folly/logging/xlog.h>
2323
#include <folly/synchronization/SanitizeThread.h>
24+
#include <folly/hash/Hash.h>
25+
#include <folly/container/F14Map.h>
2426

2527
#include <functional>
2628
#include <memory>
@@ -1449,12 +1451,22 @@ class CacheAllocator : public CacheBase {
14491451

14501452
// Try to move the item down to the next memory tier
14511453
//
1454+
// @param tid current tier ID of the item
1455+
// @param pid the pool ID the item belong to.
14521456
// @param item the item to evict
14531457
//
14541458
// @return valid handle to the item. This will be the last
14551459
// handle to the item. On failure an empty handle.
14561460
ItemHandle tryEvictToNextMemoryTier(TierId tid, PoolId pid, Item& item);
14571461

1462+
// Try to move the item down to the next memory tier
1463+
//
1464+
// @param item the item to evict
1465+
//
1466+
// @return valid handle to the item. This will be the last
1467+
// handle to the item. On failure an empty handle.
1468+
ItemHandle tryEvictToNextMemoryTier(Item& item);
1469+
14581470
// Deserializer CacheAllocatorMetadata and verify the version
14591471
//
14601472
// @param deserializer Deserializer object
@@ -1754,6 +1766,84 @@ class CacheAllocator : public CacheBase {
17541766
return 0; // TODO
17551767
}
17561768

1769+
bool addWaitContextForMovingItem(
1770+
folly::StringPiece key, std::shared_ptr<WaitContext<ItemHandle>> waiter);
1771+
1772+
class MoveCtx {
1773+
public:
1774+
MoveCtx() {}
1775+
1776+
~MoveCtx() {
1777+
// prevent any further enqueue to waiters
1778+
// Note: we don't need to hold locks since no one can enqueue
1779+
// after this point.
1780+
wakeUpWaiters();
1781+
}
1782+
1783+
// record the item handle. Upon destruction we will wake up the waiters
1784+
// and pass a clone of the handle to the callBack. By default we pass
1785+
// a null handle
1786+
void setItemHandle(ItemHandle _it) { it = std::move(_it); }
1787+
1788+
// enqueue a waiter into the waiter list
1789+
// @param waiter WaitContext
1790+
void addWaiter(std::shared_ptr<WaitContext<ItemHandle>> waiter) {
1791+
XDCHECK(waiter);
1792+
waiters.push_back(std::move(waiter));
1793+
}
1794+
1795+
private:
1796+
// notify all pending waiters that are waiting for the fetch.
1797+
void wakeUpWaiters() {
1798+
bool refcountOverflowed = false;
1799+
for (auto& w : waiters) {
1800+
// If refcount overflowed earlier, then we will return miss to
1801+
// all subsequent waitors.
1802+
if (refcountOverflowed) {
1803+
w->set(ItemHandle{});
1804+
continue;
1805+
}
1806+
1807+
try {
1808+
w->set(it.clone());
1809+
} catch (const exception::RefcountOverflow&) {
1810+
// We'll return a miss to the user's pending read,
1811+
// so we should enqueue a delete via NvmCache.
1812+
// TODO: cache.remove(it);
1813+
refcountOverflowed = true;
1814+
}
1815+
}
1816+
}
1817+
1818+
ItemHandle it; // will be set when Context is being filled
1819+
std::vector<std::shared_ptr<WaitContext<ItemHandle>>> waiters; // list of
1820+
// waiters
1821+
};
1822+
using MoveMap =
1823+
folly::F14ValueMap<folly::StringPiece,
1824+
std::unique_ptr<MoveCtx>,
1825+
folly::HeterogeneousAccessHash<folly::StringPiece>>;
1826+
1827+
static size_t getShardForKey(folly::StringPiece key) {
1828+
return folly::Hash()(key) % kShards;
1829+
}
1830+
1831+
MoveMap& getMoveMapForShard(size_t shard) {
1832+
return movesMap_[shard].movesMap_;
1833+
}
1834+
1835+
MoveMap& getMoveMap(folly::StringPiece key) {
1836+
return getMoveMapForShard(getShardForKey(key));
1837+
}
1838+
1839+
std::unique_lock<std::mutex> getMoveLockForShard(size_t shard) {
1840+
return std::unique_lock<std::mutex>(moveLock_[shard].moveLock_);
1841+
}
1842+
1843+
std::unique_lock<std::mutex> getMoveLock(folly::StringPiece key) {
1844+
return getMoveLockForShard(getShardForKey(key));
1845+
}
1846+
17571847
// Whether the memory allocator for this cache allocator was created on shared
17581848
// memory. The hash table, chained item hash table etc is also created on
17591849
// shared memory except for temporary shared memory mode when they're created
@@ -1871,6 +1961,18 @@ class CacheAllocator : public CacheBase {
18711961
// indicates if the shutdown of cache is in progress or not
18721962
std::atomic<bool> shutDownInProgress_{false};
18731963

1964+
static constexpr size_t kShards = 8192; // TODO: need to define right value
1965+
1966+
// a map of all pending moves
1967+
struct {
1968+
alignas(folly::hardware_destructive_interference_size) MoveMap movesMap_;
1969+
} movesMap_[kShards];
1970+
1971+
// a map of move locks for each shard
1972+
struct {
1973+
alignas(folly::hardware_destructive_interference_size) std::mutex moveLock_;
1974+
} moveLock_[kShards];
1975+
18741976
// END private members
18751977

18761978
// Make this friend to give access to acquire and release

cachelib/allocator/CacheItem-inl.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,21 @@ bool CacheItem<CacheTrait>::isNvmEvicted() const noexcept {
264264
return ref_.isNvmEvicted();
265265
}
266266

267+
template <typename CacheTrait>
268+
void CacheItem<CacheTrait>::markIncomplete() noexcept {
269+
ref_.markIncomplete();
270+
}
271+
272+
template <typename CacheTrait>
273+
void CacheItem<CacheTrait>::unmarkIncomplete() noexcept {
274+
ref_.unmarkIncomplete();
275+
}
276+
277+
template <typename CacheTrait>
278+
bool CacheItem<CacheTrait>::isIncomplete() const noexcept {
279+
return ref_.isIncomplete();
280+
}
281+
267282
template <typename CacheTrait>
268283
void CacheItem<CacheTrait>::markIsChainedItem() noexcept {
269284
XDCHECK(!hasChainedItem());

cachelib/allocator/CacheItem.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,14 @@ class CACHELIB_PACKED_ATTR CacheItem {
240240
void unmarkNvmEvicted() noexcept;
241241
bool isNvmEvicted() const noexcept;
242242

243+
/**
244+
* Marks that the item is migrating between memory tiers and
245+
* not ready for access now. Accessing thread should wait.
246+
*/
247+
void markIncomplete() noexcept;
248+
void unmarkIncomplete() noexcept;
249+
bool isIncomplete() const noexcept;
250+
243251
/**
244252
* Function to set the timestamp for when to expire an item
245253
* Employs a best-effort approach to update the expiryTime. Item's expiry

cachelib/allocator/Handle.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,14 @@ struct HandleImpl {
464464

465465
// Handle which has the item already
466466
FOLLY_ALWAYS_INLINE HandleImpl(Item* it, CacheT& alloc) noexcept
467-
: alloc_(&alloc), it_(it) {}
467+
: alloc_(&alloc), it_(it) {
468+
if (it_ && it_->isIncomplete()) {
469+
waitContext_ = std::make_shared<ItemWaitContext>(alloc);
470+
if (!alloc_->addWaitContextForMovingItem(it->getKey(), waitContext_)) {
471+
waitContext_.reset();
472+
}
473+
}
474+
}
468475

469476
// handle that has a wait context allocated. Used for async handles
470477
// In this case, the it_ will be filled in asynchronously and mulitple

cachelib/allocator/Refcount.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ class FOLLY_PACK_ATTR RefcountWithFlags {
116116
// unevictable in the past.
117117
kUnevictable_NOOP,
118118

119+
// Item is accecible but content is not ready yet. Used by eviction
120+
// when Item is moved between memory tiers.
121+
kIncomplete,
122+
119123
// Unused. This is just to indciate the maximum number of flags
120124
kFlagMax,
121125
};
@@ -329,6 +333,14 @@ class FOLLY_PACK_ATTR RefcountWithFlags {
329333
void unmarkNvmEvicted() noexcept { return unSetFlag<kNvmEvicted>(); }
330334
bool isNvmEvicted() const noexcept { return isFlagSet<kNvmEvicted>(); }
331335

336+
/**
337+
* Marks that the item is migrating between memory tiers and
338+
* not ready for access now. Accessing thread should wait.
339+
*/
340+
void markIncomplete() noexcept { return setFlag<kIncomplete>(); }
341+
void unmarkIncomplete() noexcept { return unSetFlag<kIncomplete>(); }
342+
bool isIncomplete() const noexcept { return isFlagSet<kIncomplete>(); }
343+
332344
// Whether or not an item is completely drained of access
333345
// Refcount is 0 and the item is not linked, accessible, nor moving
334346
bool isDrained() const noexcept { return getRefWithAccessAndAdmin() == 0; }

0 commit comments

Comments
 (0)