diff --git a/include/util/tcache.h b/include/util/tcache.h index 11f9044d28..2156579932 100644 --- a/include/util/tcache.h +++ b/include/util/tcache.h @@ -13,27 +13,26 @@ * along with this program. If not, see . */ -#ifndef _TD_UTIL_CACHE_H -#define _TD_UTIL_CACHE_H +#ifndef _TD_UTIL_CACHE_H_ +#define _TD_UTIL_CACHE_H_ + +#include "thash.h" +#include "tlockfree.h" #ifdef __cplusplus extern "C" { #endif -#include "os.h" -#include "tlockfree.h" -#include "thash.h" - -#if defined(_TD_ARM_32) - #define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_INT - #define TSDB_CACHE_PTR_TYPE int32_t +#if defined(_TD_ARM_32) +#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_INT +#define TSDB_CACHE_PTR_TYPE int32_t #else - #define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_BIGINT - #define TSDB_CACHE_PTR_TYPE int64_t +#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_BIGINT +#define TSDB_CACHE_PTR_TYPE int64_t #endif -typedef void (*__cache_free_fn_t)(void*); -typedef void (*__cache_trav_fn_t)(void*, void*); +typedef void (*__cache_free_fn_t)(void *); +typedef void (*__cache_trav_fn_t)(void *, void *); typedef struct SCacheStatis { int64_t missCount; @@ -45,17 +44,17 @@ typedef struct SCacheStatis { struct STrashElem; typedef struct SCacheDataNode { - uint64_t addedTime; // the added time when this element is added or updated into cache - uint64_t lifespan; // life duration when this element should be remove from cache - uint64_t expireTime; // expire time + uint64_t addedTime; // the added time when this element is added or updated into cache + uint64_t lifespan; // life duration when this element should be remove from cache + uint64_t expireTime; // expire time uint64_t signature; - struct STrashElem *pTNodeHeader; // point to trash node head - uint16_t keySize: 15; // max key size: 32kb - bool inTrashcan: 1;// denote if it is in trash or not - uint32_t size; // allocated size for current SCacheDataNode + struct STrashElem *pTNodeHeader; // point to trash node head + uint16_t keySize : 15; // max key size: 32kb + bool inTrashcan : 1; // denote if it is in trash or not + uint32_t size; // allocated size for current SCacheDataNode T_REF_DECLARE() - char *key; - char data[]; + char *key; + char data[]; } SCacheDataNode; typedef struct STrashElem { @@ -73,22 +72,22 @@ typedef struct STrashElem { * when the node in pTrash does not be referenced, it will be release at the expired expiredTime */ typedef struct { - int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. - int64_t refreshTime; - STrashElem * pTrash; - char* name; - SCacheStatis statistics; - SHashObj * pHashTable; + int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. + int64_t refreshTime; + STrashElem *pTrash; + char *name; + SCacheStatis statistics; + SHashObj *pHashTable; __cache_free_fn_t freeFp; - uint32_t numOfElemsInTrash; // number of element in trash - uint8_t deleting; // set the deleting flag to stop refreshing ASAP. - pthread_t refreshWorker; - bool extendLifespan; // auto extend life span when one item is accessed. - int64_t checkTick; // tick used to record the check times of the refresh threads + uint32_t numOfElemsInTrash; // number of element in trash + uint8_t deleting; // set the deleting flag to stop refreshing ASAP. + pthread_t refreshWorker; + bool extendLifespan; // auto extend life span when one item is accessed. + int64_t checkTick; // tick used to record the check times of the refresh threads #if defined(LINUX) pthread_rwlock_t lock; #else - pthread_mutex_t lock; + pthread_mutex_t lock; #endif } SCacheObj; @@ -101,7 +100,8 @@ typedef struct { * @param fn free resource callback function * @return */ -SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char *cacheName); +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, + const char *cacheName); /** * add data into cache @@ -113,7 +113,8 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext * @param keepTime survival time in second * @return cached element */ -void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS); +void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, + int32_t durationMS); /** * get data from cache @@ -177,7 +178,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj); * @param fp * @return */ -void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1); +void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1); /** * stop background refresh worker thread @@ -188,4 +189,4 @@ void taosStopCacheRefreshWorker(); } #endif -#endif /*_TD_UTIL_CACHE_H*/ +#endif /*_TD_UTIL_CACHE_H_*/ diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 6914e7e2d0..64d822f750 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -14,11 +14,10 @@ */ #define _DEFAULT_SOURCE -#include "os.h" +#include "tcache.h" #include "tlog.h" #include "ttimer.h" #include "tutil.h" -#include "tcache.h" static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) { #if defined(LINUX) @@ -62,15 +61,15 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime * @param handle Cache object handle */ -static void* taosCacheTimedRefresh(void *handle); +static void *taosCacheTimedRefresh(void *handle); -static pthread_t cacheRefreshWorker = {0}; -static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; -static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; -static SArray* pCacheArrayList = NULL; -static bool stopRefreshWorker = false; -static bool refreshWorkerNormalStopped = false; -static bool refreshWorkerUnexpectedStopped = false; +static pthread_t cacheRefreshWorker = {0}; +static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; +static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; +static SArray *pCacheArrayList = NULL; +static bool stopRefreshWorker = false; +static bool refreshWorkerNormalStopped = false; +static bool refreshWorkerUnexpectedStopped = false; static void doInitRefreshThread(void) { pCacheArrayList = taosArrayInit(4, POINTER_BYTES); @@ -83,7 +82,7 @@ static void doInitRefreshThread(void) { pthread_attr_destroy(&thattr); } -pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) { +pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) { pthread_once(&cacheThreadInit, doInitRefreshThread); pthread_mutex_lock(&guard); @@ -102,7 +101,8 @@ pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) { * @param lifespan total survial expiredTime from now * @return SCacheDataNode */ -static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration); +static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, + uint64_t duration); /** * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash @@ -146,18 +146,18 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo free(pNode); } -static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { - if (pElem->pData->signature != (uint64_t) pElem->pData) { +static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) { + if (pElem->pData->signature != (uint64_t)pElem->pData) { uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData); return NULL; } - STrashElem* next = pElem->next; + STrashElem *next = pElem->next; pCacheObj->numOfElemsInTrash--; if (pElem->prev) { pElem->prev->next = pElem->next; - } else { // pnode is the header, update header + } else { // pnode is the header, update header pCacheObj->pTrash = pElem->next; } @@ -172,7 +172,7 @@ static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STr return next; } -static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { +static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) { if (pCacheObj->freeFp) { pCacheObj->freeFp(pElem->pData->data); } @@ -181,19 +181,20 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem free(pElem); } -SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) { - const int32_t SLEEP_DURATION = 500; //500 ms +SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, + const char *cacheName) { + const int32_t SLEEP_DURATION = 500; // 500 ms if (refreshTimeInSeconds <= 0) { return NULL; } - + SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj)); if (pCacheObj == NULL) { uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - + pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK); pCacheObj->name = strdup(cacheName); if (pCacheObj->pHashTable == NULL) { @@ -201,17 +202,17 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - + // set free cache node callback function - pCacheObj->freeFp = fn; + pCacheObj->freeFp = fn; pCacheObj->refreshTime = refreshTimeInSeconds * 1000; - pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; + pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access if (__cache_lock_init(pCacheObj) != 0) { taosHashCleanup(pCacheObj->pHashTable); free(pCacheObj); - + uError("failed to init lock, reason:%s", strerror(errno)); return NULL; } @@ -220,7 +221,8 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext return pCacheObj; } -void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS) { +void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, + int32_t durationMS) { if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) { return NULL; } @@ -242,8 +244,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize); } else { // duplicated key exists while (1) { - SCacheDataNode* p = NULL; -// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*)); + SCacheDataNode *p = NULL; + // int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*)); int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen); // add to trashcan @@ -283,10 +285,10 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v return pNode1->data; } -static void incRefFn(void* ptNode) { +static void incRefFn(void *ptNode) { assert(ptNode != NULL); - SCacheDataNode** p = (SCacheDataNode**) ptNode; + SCacheDataNode **p = (SCacheDataNode **)ptNode; assert(T_REF_VAL_GET(*p) >= 0); int32_t ret = T_REF_INC(*p); @@ -303,15 +305,16 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen return NULL; } - SCacheDataNode* ptNode = NULL; + SCacheDataNode *ptNode = NULL; taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode); -// taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode); + // taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode); - void* pData = (ptNode != NULL)? ptNode->data:NULL; + void *pData = (ptNode != NULL) ? ptNode->data : NULL; if (pData != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode)); + uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, + T_REF_VAL_GET(ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); @@ -323,10 +326,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { if (pCacheObj == NULL || data == NULL) return NULL; - + size_t offset = offsetof(SCacheDataNode, data); SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset); - + if (ptNode->signature != (uint64_t)ptNode) { uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode); return NULL; @@ -342,22 +345,22 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL; - + size_t offset = offsetof(SCacheDataNode, data); SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset); - + if (ptNode->signature != (uint64_t)ptNode) { uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode); return NULL; } - + assert(T_REF_VAL_GET(ptNode) >= 1); - + char *d = *data; - + // clear its reference to old area *data = NULL; - + return d; } @@ -371,13 +374,12 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { return; } - // The operation of removal from hash table and addition to trashcan is not an atomic operation, // therefore the check for the empty of both the hash table and the trashcan has a race condition. // It happens when there is only one object in the cache, and two threads which has referenced this object // start to free the it simultaneously [TD-1569]. size_t offset = offsetof(SCacheDataNode, data); - + SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); if (pNode->signature != (uint64_t)pNode) { uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode); @@ -391,20 +393,20 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) { atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); - uDebug("cache:%s, data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime); + uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime); } if (_remove) { // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. - char* key = pNode->key; - char* d = pNode->data; + char *key = pNode->key; + char *d = pNode->data; int32_t ref = T_REF_VAL_GET(pNode); uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan); /* - * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users - * releasing this resources. + * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all + * users releasing this resources. * * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread * that tries to do the same thing. @@ -433,16 +435,19 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread // when reaches here. SCacheDataNode *p = NULL; - int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); -// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void *)); + int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); + // int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void + // *)); ref = T_REF_DEC(pNode); // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing. // note that the remove operation can be executed only once. if (ret == 0) { if (p != pNode) { - uDebug( "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by " - "others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data); + uDebug( + "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by " + "others already", + pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data); assert(p->pTNodeHeader == NULL); taosAddToTrashcan(pCacheObj, p); @@ -468,35 +473,36 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } } } else { - uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", - pCacheObj->name, pNode->key, pNode->data, ref); + uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name, + pNode->key, pNode->data, ref); } } } else { // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. - char* key = pNode->key; - char* p = pNode->data; + char *key = pNode->key; + char *p = pNode->data; -// int32_t ref = T_REF_VAL_GET(pNode); -// -// if (ref == 1 && inTrashcan) { -// // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be -// // destroyed by refresh worker if decrease ref count before removing it from linked-list. -// assert(pNode->pTNodeHeader->pData == pNode); -// -// __cache_wr_lock(pCacheObj); -// doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader); -// __cache_unlock(pCacheObj); -// -// ref = T_REF_DEC(pNode); -// assert(ref == 0); -// -// doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader); -// } else { -// ref = T_REF_DEC(pNode); -// assert(ref >= 0); -// } + // int32_t ref = T_REF_VAL_GET(pNode); + // + // if (ref == 1 && inTrashcan) { + // // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may + // be + // // destroyed by refresh worker if decrease ref count before removing it from linked-list. + // assert(pNode->pTNodeHeader->pData == pNode); + // + // __cache_wr_lock(pCacheObj); + // doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader); + // __cache_unlock(pCacheObj); + // + // ref = T_REF_DEC(pNode); + // assert(ref == 0); + // + // doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader); + // } else { + // ref = T_REF_DEC(pNode); + // assert(ref >= 0); + // } int32_t ref = T_REF_DEC(pNode); uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan); @@ -504,21 +510,21 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } typedef struct SHashTravSupp { - SCacheObj* pCacheObj; - int64_t time; + SCacheObj *pCacheObj; + int64_t time; __cache_trav_fn_t fp; - void* param1; + void *param1; } SHashTravSupp; -static bool travHashTableEmptyFn(void* param, void* data) { - SHashTravSupp* ps = (SHashTravSupp*) param; - SCacheObj* pCacheObj= ps->pCacheObj; +static bool travHashTableEmptyFn(void *param, void *data) { + SHashTravSupp *ps = (SHashTravSupp *)param; + SCacheObj *pCacheObj = ps->pCacheObj; - SCacheDataNode *pNode = *(SCacheDataNode **) data; + SCacheDataNode *pNode = *(SCacheDataNode **)data; if (T_REF_VAL_GET(pNode) == 0) { taosCacheReleaseNode(pCacheObj, pNode); - } else { // do add to trashcan + } else { // do add to trashcan taosAddToTrashcan(pCacheObj, pNode); } @@ -529,7 +535,7 @@ static bool travHashTableEmptyFn(void* param, void* data) { void taosCacheEmpty(SCacheObj *pCacheObj) { SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; -// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); + // taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); taosTrashcanEmpty(pCacheObj, false); } @@ -542,9 +548,9 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { // wait for the refresh thread quit before destroying the cache object. // But in the dll, the child thread will be killed before atexit takes effect. - while(atomic_load_8(&pCacheObj->deleting) != 0) { - if (refreshWorkerNormalStopped) break; - if (refreshWorkerUnexpectedStopped) return; + while (atomic_load_8(&pCacheObj->deleting) != 0) { + if (refreshWorkerNormalStopped) break; + if (refreshWorkerUnexpectedStopped) return; taosMsleep(50); } @@ -568,11 +574,11 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * memcpy(pNewNode->key, key, keyLen); - pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); - pNewNode->lifespan = duration; - pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan; - pNewNode->signature = (uint64_t)pNewNode; - pNewNode->size = (uint32_t)totalSize; + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->lifespan = duration; + pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan; + pNewNode->signature = (uint64_t)pNewNode; + pNewNode->size = (uint32_t)totalSize; return pNewNode; } @@ -610,16 +616,17 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { if (pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj->pTrash != NULL) { pCacheObj->pTrash = NULL; - uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash); + uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, + pCacheObj->numOfElemsInTrash); } __cache_unlock(pCacheObj); return; } - const char* stat[] = {"false", "true"}; + const char *stat[] = {"false", "true"}; uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name, - pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0])); + pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0])); STrashElem *pElem = pCacheObj->pTrash; while (pElem) { @@ -627,8 +634,8 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { assert(pElem->next != pElem && pElem->prev != pElem); if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { - uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data, - pCacheObj->numOfElemsInTrash - 1); + uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, + pElem->pData->data, pCacheObj->numOfElemsInTrash - 1); doRemoveElemInTrashcan(pCacheObj, pElem); doDestroyTrashcanElem(pCacheObj, pElem); @@ -642,25 +649,25 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { } void doCleanupDataCache(SCacheObj *pCacheObj) { -// SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; -// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); + // SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; + // taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); // todo memory leak if there are object with refcount greater than 0 in hash table? taosHashCleanup(pCacheObj->pHashTable); taosTrashcanEmpty(pCacheObj, true); __cache_lock_destroy(pCacheObj); - + tfree(pCacheObj->name); memset(pCacheObj, 0, sizeof(SCacheObj)); free(pCacheObj); } -bool travHashTableFn(void* param, void* data) { - SHashTravSupp* ps = (SHashTravSupp*) param; - SCacheObj* pCacheObj= ps->pCacheObj; +bool travHashTableFn(void *param, void *data) { + SHashTravSupp *ps = (SHashTravSupp *)param; + SCacheObj *pCacheObj = ps->pCacheObj; - SCacheDataNode* pNode = *(SCacheDataNode **) data; + SCacheDataNode *pNode = *(SCacheDataNode **)data; if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); @@ -676,30 +683,30 @@ bool travHashTableFn(void* param, void* data) { return true; } -static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_trav_fn_t fp, void* param1) { +static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) { assert(pCacheObj != NULL); SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1}; -// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); + // taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); } void taosCacheRefreshWorkerUnexpectedStopped(void) { - if(!refreshWorkerNormalStopped) { - refreshWorkerUnexpectedStopped=true; + if (!refreshWorkerNormalStopped) { + refreshWorkerUnexpectedStopped = true; } } -void* taosCacheTimedRefresh(void *handle) { +void *taosCacheTimedRefresh(void *handle) { assert(pCacheArrayList != NULL); uDebug("cache refresh thread starts"); setThreadName("cacheRefresh"); - const int32_t SLEEP_DURATION = 500; //500 ms - int64_t count = 0; + const int32_t SLEEP_DURATION = 500; // 500 ms + int64_t count = 0; atexit(taosCacheRefreshWorkerUnexpectedStopped); - while(1) { + while (1) { taosMsleep(SLEEP_DURATION); if (stopRefreshWorker) { goto _end; @@ -711,9 +718,9 @@ void* taosCacheTimedRefresh(void *handle) { count += 1; - for(int32_t i = 0; i < size; ++i) { + for (int32_t i = 0; i < size; ++i) { pthread_mutex_lock(&guard); - SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i); + SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i); if (pCacheObj == NULL) { uError("object is destroyed. ignore and try next"); @@ -726,8 +733,8 @@ void* taosCacheTimedRefresh(void *handle) { taosArrayRemove(pCacheArrayList, i); size = taosArrayGetSize(pCacheArrayList); - uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size); - pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj to continue releasing resources. + uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size); + pCacheObj->deleting = 0; // reset the deleting flag to enable pCacheObj to continue releasing resources. pthread_mutex_unlock(&guard); continue; @@ -757,18 +764,18 @@ void* taosCacheTimedRefresh(void *handle) { } } - _end: +_end: taosArrayDestroy(pCacheArrayList); pCacheArrayList = NULL; pthread_mutex_destroy(&guard); - refreshWorkerNormalStopped=true; + refreshWorkerNormalStopped = true; uDebug("cache refresh thread quits"); return NULL; } -void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) { +void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) { if (pCacheObj == NULL) { return; } @@ -777,6 +784,4 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) doCacheRefresh(pCacheObj, now, fp, param1); } -void taosStopCacheRefreshWorker(void) { - stopRefreshWorker = true; -} \ No newline at end of file +void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; } \ No newline at end of file