From 664bfa3625b8be20f8cbe548dad248eca23e61f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Jul 2021 16:03:18 +0800 Subject: [PATCH 1/7] [td-5418]: change the refresh thread generate mechanism to reduce the number of refresh thread. --- src/util/inc/tcache.h | 1 + src/util/src/tcache.c | 156 ++++++++++++++++++++++++++---------------- 2 files changed, 99 insertions(+), 58 deletions(-) diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index efd51f90ce..d381b8b199 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -83,6 +83,7 @@ typedef struct { uint8_t deleting; // set the deleting flag to stop refreshing ASAP. pthread_t refreshWorker; bool extendLifespan; // auto extend life span when one item is accessed. + int64_t checkTick; // tick used to record the check times of the refresh threads #if defined(LINUX) pthread_rwlock_t lock; #else diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 569f9b01bd..c1d3dad61d 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -54,6 +54,44 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { #endif } +/** + * do cleanup the taos cache + * @param pCacheObj + */ +static void doCleanupDataCache(SCacheObj *pCacheObj); + +/** + * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime + * @param handle Cache object handle + */ +static void* taosCacheTimedRefresh(void *handle); + +static pthread_t cacheRefreshWorker = 0; +static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; +static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; +static SArray* pCacheArrayList = NULL; + +static void doInitRefreshThread() { + pCacheArrayList = taosArrayInit(4, POINTER_BYTES); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + pthread_create(&cacheRefreshWorker, &thattr, taosCacheTimedRefresh, NULL); + pthread_attr_destroy(&thattr); +} + +pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) { + pthread_once(&cacheThreadInit, doInitRefreshThread); + + pthread_mutex_lock(&guard); + taosArrayPush(pCacheArrayList, &pCacheObj); + pthread_mutex_unlock(&guard); + + return cacheRefreshWorker; +} + /** * @param key key of object for hash, usually a null-terminated string * @param keyLen length of key @@ -142,19 +180,9 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem free(pElem); } -/** - * do cleanup the taos cache - * @param pCacheObj - */ -static void doCleanupDataCache(SCacheObj *pCacheObj); - -/** - * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime - * @param handle Cache object handle - */ -static void* taosCacheTimedRefresh(void *handle); - SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) { + const int32_t SLEEP_DURATION = 500; //500 ms + if (refreshTimeInSeconds <= 0) { return NULL; } @@ -174,9 +202,10 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext } // set free cache node callback function - pCacheObj->freeFp = fn; + pCacheObj->freeFp = fn; pCacheObj->refreshTime = refreshTimeInSeconds * 1000; - pCacheObj->extendLifespan = extendLifespan; + pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; + pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access if (__cache_lock_init(pCacheObj) != 0) { taosHashCleanup(pCacheObj->pHashTable); @@ -186,13 +215,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext return NULL; } - pthread_attr_t thattr; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - - pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheTimedRefresh, pCacheObj); - - pthread_attr_destroy(&thattr); + doRegisterCacheObj(pCacheObj); return pCacheObj; } @@ -364,7 +387,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) { atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); - uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime); + uDebug("cache:%s, data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime); } if (_remove) { @@ -510,8 +533,10 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { } pCacheObj->deleting = 1; - if (taosCheckPthreadValid(pCacheObj->refreshWorker)) { - pthread_join(pCacheObj->refreshWorker, NULL); + + // wait for the refresh thread quit before destroying the cache object. + while(atomic_load_64(&cacheRefreshWorker) != 0) { + taosMsleep(50); } uInfo("cache:%s will be cleaned up", pCacheObj->name); @@ -650,48 +675,63 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t } void* taosCacheTimedRefresh(void *handle) { - SCacheObj* pCacheObj = handle; - if (pCacheObj == NULL) { - uDebug("object is destroyed. no refresh retry"); - return NULL; - } + assert(pCacheArrayList != NULL); + uDebug("cache refresh thread starts"); const int32_t SLEEP_DURATION = 500; //500 ms - int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION; - int64_t count = 0; + while(1) { - taosMsleep(500); + taosMsleep(SLEEP_DURATION); - // check if current cache object will be deleted every 500ms. - if (pCacheObj->deleting) { - uDebug("%s refresh threads quit", pCacheObj->name); - break; + pthread_mutex_lock(&guard); + size_t size = taosArrayGetSize(pCacheArrayList); + pthread_mutex_unlock(&guard); + + count += 1; + + for(int32_t i = 0; i < size; ++i) { + pthread_mutex_lock(&guard); + SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i); + pthread_mutex_unlock(&guard); + + if (pCacheObj == NULL) { + uDebug("object is destroyed. no refresh retry"); + break; + } + + // check if current cache object will be deleted every 500ms. + if (pCacheObj->deleting) { + uDebug("%s is destroying, cache refresh thread quit", pCacheObj->name); + goto _end; + } + + if ((count % pCacheObj->checkTick) != 0) { + continue; + } + + size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable); + if (elemInHash + pCacheObj->numOfElemsInTrash == 0) { + continue; + } + + uDebug("%s refresh thread scan", pCacheObj->name); + pCacheObj->statistics.refreshCount++; + + // refresh data in hash table + if (elemInHash > 0) { + int64_t now = taosGetTimestampMs(); + doCacheRefresh(pCacheObj, now, NULL); + } + + taosTrashcanEmpty(pCacheObj, false); } - - if (++count < totalTick) { - continue; - } - - // reset the count value - count = 0; - size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable); - if (elemInHash + pCacheObj->numOfElemsInTrash == 0) { - continue; - } - - uDebug("%s refresh thread timed scan", pCacheObj->name); - pCacheObj->statistics.refreshCount++; - - // refresh data in hash table - if (elemInHash > 0) { - int64_t now = taosGetTimestampMs(); - doCacheRefresh(pCacheObj, now, NULL); - } - - taosTrashcanEmpty(pCacheObj, false); } + _end: + cacheRefreshWorker = 0; + taosArrayDestroy(pCacheArrayList); + uDebug("cache refresh thread quits"); return NULL; } From 8125b8083525762f178d09ea879b5f448b21941b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Jul 2021 17:21:44 +0800 Subject: [PATCH 2/7] [td-225]fix compiler error. --- src/util/src/tcache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index c1d3dad61d..6c20980e18 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -66,7 +66,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); */ static void* taosCacheTimedRefresh(void *handle); -static pthread_t cacheRefreshWorker = 0; +static pthread_t cacheRefreshWorker = {0}; static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; static SArray* pCacheArrayList = NULL; From c01f04a2095cdd102b8caba51770605a67480758 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Jul 2021 18:31:11 +0800 Subject: [PATCH 3/7] [td-255]fix compiler error. --- src/util/src/tcache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 6c20980e18..434d88894b 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -71,7 +71,7 @@ static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; static SArray* pCacheArrayList = NULL; -static void doInitRefreshThread() { +static void doInitRefreshThread(void) { pCacheArrayList = taosArrayInit(4, POINTER_BYTES); pthread_attr_t thattr; From 2e4229717eb4ac24e92657224a7170e1c4714fcd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Jul 2021 23:04:30 +0800 Subject: [PATCH 4/7] [td-225]fix compiler error. --- src/util/src/tcache.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 434d88894b..e847debb89 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -535,7 +535,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { pCacheObj->deleting = 1; // wait for the refresh thread quit before destroying the cache object. - while(atomic_load_64(&cacheRefreshWorker) != 0) { + while(atomic_load_ptr((void**)&pCacheArrayList) != 0) { taosMsleep(50); } @@ -729,8 +729,11 @@ void* taosCacheTimedRefresh(void *handle) { } _end: - cacheRefreshWorker = 0; taosArrayDestroy(pCacheArrayList); + + pCacheArrayList = NULL; + pthread_mutex_destroy(&guard); + uDebug("cache refresh thread quits"); return NULL; } From 97159f54da5217ce08bd98bb6acffdcbde00bc95 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Jul 2021 17:20:33 +0800 Subject: [PATCH 5/7] [td-225]fix bug found by regression test. --- src/util/src/tcache.c | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ee7921d4c7..9c156658aa 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -535,7 +535,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { pCacheObj->deleting = 1; // wait for the refresh thread quit before destroying the cache object. - while(atomic_load_ptr((void**)&pCacheArrayList) != 0) { + while(atomic_load_8(&pCacheObj->deleting) != 0) { taosMsleep(50); } @@ -695,19 +695,30 @@ void* taosCacheTimedRefresh(void *handle) { for(int32_t i = 0; i < size; ++i) { pthread_mutex_lock(&guard); SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i); - pthread_mutex_unlock(&guard); if (pCacheObj == NULL) { - uDebug("object is destroyed. no refresh retry"); - break; + uError("object is destroyed. ignore and try next"); + pthread_mutex_unlock(&guard); + continue; } // check if current cache object will be deleted every 500ms. if (pCacheObj->deleting) { - uDebug("%s is destroying, cache refresh thread quit", pCacheObj->name); - goto _end; + taosArrayRemove(pCacheArrayList, i); + size = taosArrayGetSize(pCacheArrayList); + + uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRId64, pCacheObj->name, size); + pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj does self destroy process + + // all contained caches has been marked to be removed, destroy the scanner it self. + if (size == 0) { + pthread_mutex_unlock(&guard); + goto _end; + } } + pthread_mutex_unlock(&guard); + if ((count % pCacheObj->checkTick) != 0) { continue; } From 339fe7358b4b2131bd7ddd4a575a4fb04fa11878 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Jul 2021 18:51:43 +0800 Subject: [PATCH 6/7] [td-225]fix compiler error. --- src/util/src/tcache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 9c156658aa..fba62fc067 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -707,7 +707,7 @@ void* taosCacheTimedRefresh(void *handle) { taosArrayRemove(pCacheArrayList, i); size = taosArrayGetSize(pCacheArrayList); - uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRId64, pCacheObj->name, size); + uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size); pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj does self destroy process // all contained caches has been marked to be removed, destroy the scanner it self. From 5cc26ce991868d7ad726ee35cd935d3cd56401c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Jul 2021 11:27:46 +0800 Subject: [PATCH 7/7] [td-225]fix bug found by regression test. --- src/dnode/src/dnodeMain.c | 4 +++- src/util/inc/tcache.h | 5 +++++ src/util/src/tcache.c | 17 +++++++++++------ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 7723b6221b..af63b10bc5 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -40,8 +40,9 @@ #include "dnodeShell.h" #include "dnodeTelemetry.h" #include "module.h" -#include "qScript.h" #include "mnode.h" +#include "qScript.h" +#include "tcache.h" #if !defined(_MODULE) || !defined(_TD_LINUX) int32_t moduleStart() { return 0; } @@ -207,6 +208,7 @@ void dnodeCleanUpSystem() { dnodeCleanupComponents(); taos_cleanup(); taosCloseLog(); + taosStopCacheRefreshWorker(); } } diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index d381b8b199..e41b544d00 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -178,6 +178,11 @@ void taosCacheCleanup(SCacheObj *pCacheObj); */ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); +/** + * stop background refresh worker thread + */ +void taosStopCacheRefreshWorker(); + #ifdef __cplusplus } #endif diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index fba62fc067..4aa5b4378f 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -70,6 +70,7 @@ static pthread_t cacheRefreshWorker = {0}; static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; static SArray* pCacheArrayList = NULL; +static bool stopRefreshWorker = false; static void doInitRefreshThread(void) { pCacheArrayList = taosArrayInit(4, POINTER_BYTES); @@ -685,6 +686,9 @@ void* taosCacheTimedRefresh(void *handle) { while(1) { taosMsleep(SLEEP_DURATION); + if (stopRefreshWorker) { + goto _end; + } pthread_mutex_lock(&guard); size_t size = taosArrayGetSize(pCacheArrayList); @@ -708,13 +712,10 @@ void* taosCacheTimedRefresh(void *handle) { size = taosArrayGetSize(pCacheArrayList); uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size); - pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj does self destroy process + pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj to continue releasing resources. - // all contained caches has been marked to be removed, destroy the scanner it self. - if (size == 0) { - pthread_mutex_unlock(&guard); - goto _end; - } + pthread_mutex_unlock(&guard); + continue; } pthread_mutex_unlock(&guard); @@ -759,3 +760,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { int64_t now = taosGetTimestampMs(); doCacheRefresh(pCacheObj, now, fp); } + +void taosStopCacheRefreshWorker() { + stopRefreshWorker = false; +} \ No newline at end of file