Skip to content

Commit b3d469a

Browse files
Jiayue Baofacebook-github-bot
authored andcommitted
Make periodic worker start and stop util functions
Summary: This change is to avoid code duplication inside CacheAllocator and ObjectCache. Reviewed By: jaesoo-fb Differential Revision: D47372992 fbshipit-source-id: 0e365700a38ded6202ec42b7e5c621c77814a021
1 parent 5e3869e commit b3d469a

File tree

4 files changed

+81
-101
lines changed

4 files changed

+81
-101
lines changed

cachelib/allocator/CacheAllocator-inl.h

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3419,17 +3419,7 @@ bool CacheAllocator<CacheTrait>::stopWorker(folly::StringPiece name,
34193419
std::unique_ptr<T>& worker,
34203420
std::chrono::seconds timeout) {
34213421
std::lock_guard<std::mutex> l(workersMutex_);
3422-
if (!worker) {
3423-
return true;
3424-
}
3425-
3426-
bool ret = worker->stop(timeout);
3427-
if (ret) {
3428-
XLOGF(DBG1, "Stopped worker '{}'", name);
3429-
} else {
3430-
XLOGF(ERR, "Couldn't stop worker '{}', timeout: {} seconds", name,
3431-
timeout.count());
3432-
}
3422+
auto ret = util::stopPeriodicWorker(name, worker, timeout);
34333423
worker.reset();
34343424
return ret;
34353425
}
@@ -3441,29 +3431,22 @@ bool CacheAllocator<CacheTrait>::startNewWorker(
34413431
std::unique_ptr<T>& worker,
34423432
std::chrono::milliseconds interval,
34433433
Args&&... args) {
3444-
if (!stopWorker(name, worker)) {
3434+
if (worker && !stopWorker(name, worker)) {
34453435
return false;
34463436
}
34473437

34483438
std::lock_guard<std::mutex> l(workersMutex_);
3449-
worker = std::make_unique<T>(*this, std::forward<Args>(args)...);
3450-
bool ret = worker->start(interval, name);
3451-
if (ret) {
3452-
XLOGF(DBG1, "Started worker '{}'", name);
3453-
} else {
3454-
XLOGF(ERR, "Couldn't start worker '{}', interval: {} milliseconds", name,
3455-
interval.count());
3456-
}
3457-
return ret;
3439+
return util::startPeriodicWorker(name, worker, interval,
3440+
std::forward<Args>(args)...);
34583441
}
34593442

34603443
template <typename CacheTrait>
34613444
bool CacheAllocator<CacheTrait>::startNewPoolRebalancer(
34623445
std::chrono::milliseconds interval,
34633446
std::shared_ptr<RebalanceStrategy> strategy,
34643447
unsigned int freeAllocThreshold) {
3465-
if (!startNewWorker("PoolRebalancer", poolRebalancer_, interval, strategy,
3466-
freeAllocThreshold)) {
3448+
if (!startNewWorker("PoolRebalancer", poolRebalancer_, interval, *this,
3449+
strategy, freeAllocThreshold)) {
34673450
return false;
34683451
}
34693452

@@ -3479,7 +3462,7 @@ bool CacheAllocator<CacheTrait>::startNewPoolResizer(
34793462
std::chrono::milliseconds interval,
34803463
unsigned int poolResizeSlabsPerIter,
34813464
std::shared_ptr<RebalanceStrategy> strategy) {
3482-
if (!startNewWorker("PoolResizer", poolResizer_, interval,
3465+
if (!startNewWorker("PoolResizer", poolResizer_, interval, *this,
34833466
poolResizeSlabsPerIter, strategy)) {
34843467
return false;
34853468
}
@@ -3500,8 +3483,8 @@ bool CacheAllocator<CacheTrait>::startNewPoolOptimizer(
35003483
// it should do actual size optimization. Probably need to move to using
35013484
// the same interval for both, with confirmation of further experiments.
35023485
const auto workerInterval = std::chrono::seconds(1);
3503-
if (!startNewWorker("PoolOptimizer", poolOptimizer_, workerInterval, strategy,
3504-
regularInterval.count(), ccacheInterval.count(),
3486+
if (!startNewWorker("PoolOptimizer", poolOptimizer_, workerInterval, *this,
3487+
strategy, regularInterval.count(), ccacheInterval.count(),
35053488
ccacheStepSizePercent)) {
35063489
return false;
35073490
}
@@ -3519,7 +3502,7 @@ bool CacheAllocator<CacheTrait>::startNewMemMonitor(
35193502
std::chrono::milliseconds interval,
35203503
MemoryMonitor::Config config,
35213504
std::shared_ptr<RebalanceStrategy> strategy) {
3522-
if (!startNewWorker("MemoryMonitor", memMonitor_, interval, config,
3505+
if (!startNewWorker("MemoryMonitor", memMonitor_, interval, *this, config,
35233506
strategy)) {
35243507
return false;
35253508
}
@@ -3534,7 +3517,8 @@ template <typename CacheTrait>
35343517
bool CacheAllocator<CacheTrait>::startNewReaper(
35353518
std::chrono::milliseconds interval,
35363519
util::Throttler::Config reaperThrottleConfig) {
3537-
if (!startNewWorker("Reaper", reaper_, interval, reaperThrottleConfig)) {
3520+
if (!startNewWorker("Reaper", reaper_, interval, *this,
3521+
reaperThrottleConfig)) {
35383522
return false;
35393523
}
35403524

cachelib/common/PeriodicWorker.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818

1919
#include <folly/Range.h>
20+
#include <folly/logging/xlog.h>
2021

2122
#include <atomic>
2223
#include <condition_variable>
@@ -140,5 +141,62 @@ class PeriodicWorker {
140141
/* The main worker loop that handles the work periodically */
141142
void loop(void);
142143
};
144+
145+
namespace util {
146+
// Stop a periodic worker
147+
//
148+
// @param name name of the worker
149+
// @param worker unique pointer of the worker to stop
150+
// @param timeout timeout for the worker stopping
151+
// @return true if size controller has been successfully stopped
152+
template <typename WorkerT>
153+
bool stopPeriodicWorker(folly::StringPiece name,
154+
std::unique_ptr<WorkerT>& worker,
155+
std::chrono::seconds timeout = std::chrono::seconds{
156+
0}) {
157+
if (!worker) {
158+
XLOGF(INFO, "Worker '{}' has not been started. No need to stop.", name);
159+
return true;
160+
}
161+
162+
bool ret = worker->stop(timeout);
163+
if (ret) {
164+
XLOGF(INFO, "Stopped worker '{}'", name);
165+
} else {
166+
XLOGF(ERR, "Couldn't stop worker '{}', timeout: {} seconds", name,
167+
timeout.count());
168+
}
169+
return ret;
170+
}
171+
172+
// Start a periodic worker
173+
//
174+
// @param name name of the worker
175+
// @param worker unique pointer of the worker to start
176+
// @param interval the period this worker fires
177+
// @param args... the rest of the arguments to initialize the worker
178+
// @return true if the worker has been successfully started
179+
template <typename WorkerT, typename... Args>
180+
bool startPeriodicWorker(folly::StringPiece name,
181+
std::unique_ptr<WorkerT>& worker,
182+
std::chrono::milliseconds interval,
183+
Args&&... args) {
184+
if (worker && !stopPeriodicWorker(name, worker)) {
185+
XLOGF(ERR, "Couldn't restart worker '{}' because it couldn't be stopped",
186+
name);
187+
return false;
188+
}
189+
worker = std::make_unique<WorkerT>(std::forward<Args>(args)...);
190+
bool ret = worker->start(interval, name);
191+
if (ret) {
192+
XLOGF(INFO, "Started worker '{}'", name);
193+
} else {
194+
XLOGF(ERR, "Couldn't start worker '{}', interval: {} milliseconds", name,
195+
interval.count());
196+
}
197+
return ret;
198+
}
199+
200+
} // namespace util
143201
} // namespace cachelib
144202
} // namespace facebook

cachelib/experimental/objcache2/ObjectCache-inl.h

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,17 @@ template <typename AllocatorT>
110110
void ObjectCache<AllocatorT>::initWorkers() {
111111
if (config_.objectSizeTrackingEnabled &&
112112
config_.sizeControllerIntervalMs != 0) {
113-
startWorker(kSizeControllerName, sizeController_,
114-
std::chrono::milliseconds{config_.sizeControllerIntervalMs},
115-
config_.sizeControllerThrottlerConfig);
113+
util::startPeriodicWorker(
114+
kSizeControllerName, sizeController_,
115+
std::chrono::milliseconds{config_.sizeControllerIntervalMs}, *this,
116+
config_.sizeControllerThrottlerConfig);
116117
}
117118

118119
if (config_.objectSizeTrackingEnabled &&
119120
config_.objectSizeDistributionTrackingEnabled) {
120-
startWorker(kSizeDistTrackerName, sizeDistTracker_,
121-
std::chrono::seconds{60} /*default interval to be 60s*/);
121+
util::startPeriodicWorker(
122+
kSizeDistTrackerName, sizeDistTracker_,
123+
std::chrono::seconds{60} /*default interval to be 60s*/, *this);
122124
}
123125
}
124126

@@ -371,48 +373,6 @@ ObjectCache<AllocatorT>::serializeConfigParams() const {
371373
return config;
372374
}
373375

374-
template <typename AllocatorT>
375-
template <typename WorkerT, typename... Args>
376-
bool ObjectCache<AllocatorT>::startWorker(folly::StringPiece name,
377-
std::unique_ptr<WorkerT>& worker,
378-
std::chrono::milliseconds interval,
379-
Args&&... args) {
380-
if (!stopWorker(name, worker)) {
381-
XLOGF(ERR, "Worker '{}' is already running. Cannot start it again.", name);
382-
return false;
383-
}
384-
385-
worker = std::make_unique<WorkerT>(*this, std::forward<Args>(args)...);
386-
bool ret = worker->start(interval, name);
387-
if (ret) {
388-
XLOGF(INFO, "Started worker '{}'", name);
389-
} else {
390-
XLOGF(ERR, "Couldn't start worker '{}', interval: {} milliseconds", name,
391-
interval.count());
392-
}
393-
return ret;
394-
}
395-
396-
template <typename AllocatorT>
397-
template <typename WorkerT>
398-
bool ObjectCache<AllocatorT>::stopWorker(folly::StringPiece name,
399-
std::unique_ptr<WorkerT>& worker,
400-
std::chrono::seconds timeout) {
401-
if (!worker) {
402-
XLOGF(INFO, "Worker '{}' has not been started. No need to stop.", name);
403-
return true;
404-
}
405-
406-
bool ret = worker->stop(timeout);
407-
if (ret) {
408-
XLOGF(INFO, "Stopped worker '{}'", name);
409-
} else {
410-
XLOGF(ERR, "Couldn't stop worker '{}', timeout: {} seconds", name,
411-
timeout.count());
412-
}
413-
return ret;
414-
}
415-
416376
template <typename AllocatorT>
417377
bool ObjectCache<AllocatorT>::persist() {
418378
if (config_.persistBaseFilePath.empty() || !config_.serializeCb) {

cachelib/experimental/objcache2/ObjectCache.h

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,10 @@ class ObjectCache : public ObjectCacheBase<AllocatorT> {
406406
// @return true if all workers have been successfully stopped
407407
bool stopAllWorkers(std::chrono::seconds timeout = std::chrono::seconds{0}) {
408408
bool success = true;
409-
success &= stopWorker(kSizeControllerName, sizeController_, timeout);
410-
success &= stopWorker(kSizeDistTrackerName, sizeDistTracker_, timeout);
409+
success &=
410+
util::stopPeriodicWorker(kSizeControllerName, sizeController_, timeout);
411+
success &= util::stopPeriodicWorker(
412+
kSizeDistTrackerName, sizeDistTracker_, timeout);
411413
success &= this->l1Cache_->stopWorkers(timeout);
412414
return success;
413415
}
@@ -443,30 +445,6 @@ class ObjectCache : public ObjectCacheBase<AllocatorT> {
443445
// Returns the total number of placeholders
444446
size_t getNumPlaceholders() const { return placeholders_.size(); }
445447

446-
// Start a periodic worker
447-
//
448-
// @param name name of the worker
449-
// @param worker unique pointer of the worker to start
450-
// @param interval the period this worker fires
451-
// @param args... the rest of the arguments to initialize the worker
452-
// @return true if the worker has been successfully started
453-
template <typename WorkerT, typename... Args>
454-
bool startWorker(folly::StringPiece name,
455-
std::unique_ptr<WorkerT>& worker,
456-
std::chrono::milliseconds interval,
457-
Args&&... args);
458-
459-
// Stop a periodic worker
460-
//
461-
// @param name name of the worker
462-
// @param worker unique pointer of the worker to stop
463-
// @param timeout timeout for the worker stopping
464-
// @return true if size controller has been successfully stopped
465-
template <typename WorkerT>
466-
bool stopWorker(folly::StringPiece name,
467-
std::unique_ptr<WorkerT>& worker,
468-
std::chrono::seconds timeout = std::chrono::seconds{0});
469-
470448
// Get a ReadHandle reference from the object shared_ptr
471449
template <typename T>
472450
typename AllocatorT::ReadHandle& getReadHandleRefInternal(

0 commit comments

Comments
 (0)