cache
This commit is contained in:
parent
1360f6b265
commit
9df1fed6ab
|
@ -13,27 +13,26 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_UTIL_CACHE_H
|
#ifndef _TD_UTIL_CACHE_H_
|
||||||
#define _TD_UTIL_CACHE_H
|
#define _TD_UTIL_CACHE_H_
|
||||||
|
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
#if defined(_TD_ARM_32)
|
||||||
#include "tlockfree.h"
|
#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_INT
|
||||||
#include "thash.h"
|
#define TSDB_CACHE_PTR_TYPE int32_t
|
||||||
|
|
||||||
#if defined(_TD_ARM_32)
|
|
||||||
#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_INT
|
|
||||||
#define TSDB_CACHE_PTR_TYPE int32_t
|
|
||||||
#else
|
#else
|
||||||
#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_BIGINT
|
#define TSDB_CACHE_PTR_KEY TSDB_DATA_TYPE_BIGINT
|
||||||
#define TSDB_CACHE_PTR_TYPE int64_t
|
#define TSDB_CACHE_PTR_TYPE int64_t
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef void (*__cache_free_fn_t)(void*);
|
typedef void (*__cache_free_fn_t)(void *);
|
||||||
typedef void (*__cache_trav_fn_t)(void*, void*);
|
typedef void (*__cache_trav_fn_t)(void *, void *);
|
||||||
|
|
||||||
typedef struct SCacheStatis {
|
typedef struct SCacheStatis {
|
||||||
int64_t missCount;
|
int64_t missCount;
|
||||||
|
@ -45,17 +44,17 @@ typedef struct SCacheStatis {
|
||||||
struct STrashElem;
|
struct STrashElem;
|
||||||
|
|
||||||
typedef struct SCacheDataNode {
|
typedef struct SCacheDataNode {
|
||||||
uint64_t addedTime; // the added time when this element is added or updated into cache
|
uint64_t addedTime; // the added time when this element is added or updated into cache
|
||||||
uint64_t lifespan; // life duration when this element should be remove from cache
|
uint64_t lifespan; // life duration when this element should be remove from cache
|
||||||
uint64_t expireTime; // expire time
|
uint64_t expireTime; // expire time
|
||||||
uint64_t signature;
|
uint64_t signature;
|
||||||
struct STrashElem *pTNodeHeader; // point to trash node head
|
struct STrashElem *pTNodeHeader; // point to trash node head
|
||||||
uint16_t keySize: 15; // max key size: 32kb
|
uint16_t keySize : 15; // max key size: 32kb
|
||||||
bool inTrashcan: 1;// denote if it is in trash or not
|
bool inTrashcan : 1; // denote if it is in trash or not
|
||||||
uint32_t size; // allocated size for current SCacheDataNode
|
uint32_t size; // allocated size for current SCacheDataNode
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
char *key;
|
char *key;
|
||||||
char data[];
|
char data[];
|
||||||
} SCacheDataNode;
|
} SCacheDataNode;
|
||||||
|
|
||||||
typedef struct STrashElem {
|
typedef struct STrashElem {
|
||||||
|
@ -73,22 +72,22 @@ typedef struct STrashElem {
|
||||||
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
|
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
|
||||||
int64_t refreshTime;
|
int64_t refreshTime;
|
||||||
STrashElem * pTrash;
|
STrashElem *pTrash;
|
||||||
char* name;
|
char *name;
|
||||||
SCacheStatis statistics;
|
SCacheStatis statistics;
|
||||||
SHashObj * pHashTable;
|
SHashObj *pHashTable;
|
||||||
__cache_free_fn_t freeFp;
|
__cache_free_fn_t freeFp;
|
||||||
uint32_t numOfElemsInTrash; // number of element in trash
|
uint32_t numOfElemsInTrash; // number of element in trash
|
||||||
uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
|
uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
|
||||||
pthread_t refreshWorker;
|
pthread_t refreshWorker;
|
||||||
bool extendLifespan; // auto extend life span when one item is accessed.
|
bool extendLifespan; // auto extend life span when one item is accessed.
|
||||||
int64_t checkTick; // tick used to record the check times of the refresh threads
|
int64_t checkTick; // tick used to record the check times of the refresh threads
|
||||||
#if defined(LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_t lock;
|
pthread_rwlock_t lock;
|
||||||
#else
|
#else
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
#endif
|
#endif
|
||||||
} SCacheObj;
|
} SCacheObj;
|
||||||
|
|
||||||
|
@ -101,7 +100,8 @@ typedef struct {
|
||||||
* @param fn free resource callback function
|
* @param fn free resource callback function
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char *cacheName);
|
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
|
||||||
|
const char *cacheName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add data into cache
|
* add data into cache
|
||||||
|
@ -113,7 +113,8 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
|
||||||
* @param keepTime survival time in second
|
* @param keepTime survival time in second
|
||||||
* @return cached element
|
* @return cached element
|
||||||
*/
|
*/
|
||||||
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS);
|
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
|
||||||
|
int32_t durationMS);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get data from cache
|
* get data from cache
|
||||||
|
@ -177,7 +178,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
|
||||||
* @param fp
|
* @param fp
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1);
|
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* stop background refresh worker thread
|
* stop background refresh worker thread
|
||||||
|
@ -188,4 +189,4 @@ void taosStopCacheRefreshWorker();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_UTIL_CACHE_H*/
|
#endif /*_TD_UTIL_CACHE_H_*/
|
||||||
|
|
|
@ -14,11 +14,10 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "tcache.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tcache.h"
|
|
||||||
|
|
||||||
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
|
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
|
||||||
#if defined(LINUX)
|
#if defined(LINUX)
|
||||||
|
@ -62,15 +61,15 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
|
||||||
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
|
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
|
||||||
* @param handle Cache object handle
|
* @param handle Cache object handle
|
||||||
*/
|
*/
|
||||||
static void* taosCacheTimedRefresh(void *handle);
|
static void *taosCacheTimedRefresh(void *handle);
|
||||||
|
|
||||||
static pthread_t cacheRefreshWorker = {0};
|
static pthread_t cacheRefreshWorker = {0};
|
||||||
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
|
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
|
||||||
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static SArray* pCacheArrayList = NULL;
|
static SArray *pCacheArrayList = NULL;
|
||||||
static bool stopRefreshWorker = false;
|
static bool stopRefreshWorker = false;
|
||||||
static bool refreshWorkerNormalStopped = false;
|
static bool refreshWorkerNormalStopped = false;
|
||||||
static bool refreshWorkerUnexpectedStopped = false;
|
static bool refreshWorkerUnexpectedStopped = false;
|
||||||
|
|
||||||
static void doInitRefreshThread(void) {
|
static void doInitRefreshThread(void) {
|
||||||
pCacheArrayList = taosArrayInit(4, POINTER_BYTES);
|
pCacheArrayList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -83,7 +82,7 @@ static void doInitRefreshThread(void) {
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) {
|
pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) {
|
||||||
pthread_once(&cacheThreadInit, doInitRefreshThread);
|
pthread_once(&cacheThreadInit, doInitRefreshThread);
|
||||||
|
|
||||||
pthread_mutex_lock(&guard);
|
pthread_mutex_lock(&guard);
|
||||||
|
@ -102,7 +101,8 @@ pthread_t doRegisterCacheObj(SCacheObj* pCacheObj) {
|
||||||
* @param lifespan total survial expiredTime from now
|
* @param lifespan total survial expiredTime from now
|
||||||
* @return SCacheDataNode
|
* @return SCacheDataNode
|
||||||
*/
|
*/
|
||||||
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
|
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
||||||
|
uint64_t duration);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
|
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
|
||||||
|
@ -146,18 +146,18 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
|
||||||
free(pNode);
|
free(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
|
static FORCE_INLINE STrashElem *doRemoveElemInTrashcan(SCacheObj *pCacheObj, STrashElem *pElem) {
|
||||||
if (pElem->pData->signature != (uint64_t) pElem->pData) {
|
if (pElem->pData->signature != (uint64_t)pElem->pData) {
|
||||||
uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
|
uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrashElem* next = pElem->next;
|
STrashElem *next = pElem->next;
|
||||||
|
|
||||||
pCacheObj->numOfElemsInTrash--;
|
pCacheObj->numOfElemsInTrash--;
|
||||||
if (pElem->prev) {
|
if (pElem->prev) {
|
||||||
pElem->prev->next = pElem->next;
|
pElem->prev->next = pElem->next;
|
||||||
} else { // pnode is the header, update header
|
} else { // pnode is the header, update header
|
||||||
pCacheObj->pTrash = pElem->next;
|
pCacheObj->pTrash = pElem->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,7 +172,7 @@ static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STr
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
|
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem *pElem) {
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(pElem->pData->data);
|
pCacheObj->freeFp(pElem->pData->data);
|
||||||
}
|
}
|
||||||
|
@ -181,19 +181,20 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem
|
||||||
free(pElem);
|
free(pElem);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char* cacheName) {
|
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
|
||||||
const int32_t SLEEP_DURATION = 500; //500 ms
|
const char *cacheName) {
|
||||||
|
const int32_t SLEEP_DURATION = 500; // 500 ms
|
||||||
|
|
||||||
if (refreshTimeInSeconds <= 0) {
|
if (refreshTimeInSeconds <= 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
|
SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
|
||||||
if (pCacheObj == NULL) {
|
if (pCacheObj == NULL) {
|
||||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
|
pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
|
||||||
pCacheObj->name = strdup(cacheName);
|
pCacheObj->name = strdup(cacheName);
|
||||||
if (pCacheObj->pHashTable == NULL) {
|
if (pCacheObj->pHashTable == NULL) {
|
||||||
|
@ -201,17 +202,17 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
|
||||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set free cache node callback function
|
// set free cache node callback function
|
||||||
pCacheObj->freeFp = fn;
|
pCacheObj->freeFp = fn;
|
||||||
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
|
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
|
||||||
pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
|
pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
|
||||||
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
|
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
|
||||||
|
|
||||||
if (__cache_lock_init(pCacheObj) != 0) {
|
if (__cache_lock_init(pCacheObj) != 0) {
|
||||||
taosHashCleanup(pCacheObj->pHashTable);
|
taosHashCleanup(pCacheObj->pHashTable);
|
||||||
free(pCacheObj);
|
free(pCacheObj);
|
||||||
|
|
||||||
uError("failed to init lock, reason:%s", strerror(errno));
|
uError("failed to init lock, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -220,7 +221,8 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
|
||||||
return pCacheObj;
|
return pCacheObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS) {
|
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
|
||||||
|
int32_t durationMS) {
|
||||||
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
|
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -242,8 +244,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
|
||||||
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
|
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
|
||||||
} else { // duplicated key exists
|
} else { // duplicated key exists
|
||||||
while (1) {
|
while (1) {
|
||||||
SCacheDataNode* p = NULL;
|
SCacheDataNode *p = NULL;
|
||||||
// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
|
// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
|
||||||
int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen);
|
int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen);
|
||||||
|
|
||||||
// add to trashcan
|
// add to trashcan
|
||||||
|
@ -283,10 +285,10 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
|
||||||
return pNode1->data;
|
return pNode1->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void incRefFn(void* ptNode) {
|
static void incRefFn(void *ptNode) {
|
||||||
assert(ptNode != NULL);
|
assert(ptNode != NULL);
|
||||||
|
|
||||||
SCacheDataNode** p = (SCacheDataNode**) ptNode;
|
SCacheDataNode **p = (SCacheDataNode **)ptNode;
|
||||||
assert(T_REF_VAL_GET(*p) >= 0);
|
assert(T_REF_VAL_GET(*p) >= 0);
|
||||||
|
|
||||||
int32_t ret = T_REF_INC(*p);
|
int32_t ret = T_REF_INC(*p);
|
||||||
|
@ -303,15 +305,16 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCacheDataNode* ptNode = NULL;
|
SCacheDataNode *ptNode = NULL;
|
||||||
taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode);
|
taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode);
|
||||||
// taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
|
// taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
|
||||||
|
|
||||||
void* pData = (ptNode != NULL)? ptNode->data:NULL;
|
void *pData = (ptNode != NULL) ? ptNode->data : NULL;
|
||||||
|
|
||||||
if (pData != NULL) {
|
if (pData != NULL) {
|
||||||
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
||||||
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode));
|
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
|
||||||
|
T_REF_VAL_GET(ptNode));
|
||||||
} else {
|
} else {
|
||||||
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
||||||
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
|
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
|
||||||
|
@ -323,10 +326,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
||||||
|
|
||||||
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
if (pCacheObj == NULL || data == NULL) return NULL;
|
if (pCacheObj == NULL || data == NULL) return NULL;
|
||||||
|
|
||||||
size_t offset = offsetof(SCacheDataNode, data);
|
size_t offset = offsetof(SCacheDataNode, data);
|
||||||
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
|
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
|
||||||
|
|
||||||
if (ptNode->signature != (uint64_t)ptNode) {
|
if (ptNode->signature != (uint64_t)ptNode) {
|
||||||
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
|
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -342,22 +345,22 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
|
|
||||||
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
|
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
|
||||||
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
|
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
|
||||||
|
|
||||||
size_t offset = offsetof(SCacheDataNode, data);
|
size_t offset = offsetof(SCacheDataNode, data);
|
||||||
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
|
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
|
||||||
|
|
||||||
if (ptNode->signature != (uint64_t)ptNode) {
|
if (ptNode->signature != (uint64_t)ptNode) {
|
||||||
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
|
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(T_REF_VAL_GET(ptNode) >= 1);
|
assert(T_REF_VAL_GET(ptNode) >= 1);
|
||||||
|
|
||||||
char *d = *data;
|
char *d = *data;
|
||||||
|
|
||||||
// clear its reference to old area
|
// clear its reference to old area
|
||||||
*data = NULL;
|
*data = NULL;
|
||||||
|
|
||||||
return d;
|
return d;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,13 +374,12 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// The operation of removal from hash table and addition to trashcan is not an atomic operation,
|
// The operation of removal from hash table and addition to trashcan is not an atomic operation,
|
||||||
// therefore the check for the empty of both the hash table and the trashcan has a race condition.
|
// therefore the check for the empty of both the hash table and the trashcan has a race condition.
|
||||||
// It happens when there is only one object in the cache, and two threads which has referenced this object
|
// It happens when there is only one object in the cache, and two threads which has referenced this object
|
||||||
// start to free the it simultaneously [TD-1569].
|
// start to free the it simultaneously [TD-1569].
|
||||||
size_t offset = offsetof(SCacheDataNode, data);
|
size_t offset = offsetof(SCacheDataNode, data);
|
||||||
|
|
||||||
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
|
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
|
||||||
if (pNode->signature != (uint64_t)pNode) {
|
if (pNode->signature != (uint64_t)pNode) {
|
||||||
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
|
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
|
||||||
|
@ -391,20 +393,20 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
|
|
||||||
if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
|
if (pCacheObj->extendLifespan && (!inTrashcan) && (!_remove)) {
|
||||||
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
|
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
|
||||||
uDebug("cache:%s, data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
|
uDebug("cache:%s, data:%p extend expire time: %" PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_remove) {
|
if (_remove) {
|
||||||
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
|
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
|
||||||
char* key = pNode->key;
|
char *key = pNode->key;
|
||||||
char* d = pNode->data;
|
char *d = pNode->data;
|
||||||
|
|
||||||
int32_t ref = T_REF_VAL_GET(pNode);
|
int32_t ref = T_REF_VAL_GET(pNode);
|
||||||
uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
|
uDebug("cache:%s, key:%p, %p is released, refcnt:%d, in trashcan:%d", pCacheObj->name, key, d, ref - 1, inTrashcan);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
|
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all
|
||||||
* releasing this resources.
|
* users releasing this resources.
|
||||||
*
|
*
|
||||||
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
|
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
|
||||||
* that tries to do the same thing.
|
* that tries to do the same thing.
|
||||||
|
@ -433,16 +435,19 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
|
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
|
||||||
// when reaches here.
|
// when reaches here.
|
||||||
SCacheDataNode *p = NULL;
|
SCacheDataNode *p = NULL;
|
||||||
int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
||||||
// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void *));
|
// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void
|
||||||
|
// *));
|
||||||
ref = T_REF_DEC(pNode);
|
ref = T_REF_DEC(pNode);
|
||||||
|
|
||||||
// successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
|
// successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
|
||||||
// note that the remove operation can be executed only once.
|
// note that the remove operation can be executed only once.
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
if (p != pNode) {
|
if (p != pNode) {
|
||||||
uDebug( "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
|
uDebug(
|
||||||
"others already", pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);
|
"cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
|
||||||
|
"others already",
|
||||||
|
pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);
|
||||||
|
|
||||||
assert(p->pTNodeHeader == NULL);
|
assert(p->pTNodeHeader == NULL);
|
||||||
taosAddToTrashcan(pCacheObj, p);
|
taosAddToTrashcan(pCacheObj, p);
|
||||||
|
@ -468,35 +473,36 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d",
|
uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
|
||||||
pCacheObj->name, pNode->key, pNode->data, ref);
|
pNode->key, pNode->data, ref);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
|
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
|
||||||
char* key = pNode->key;
|
char *key = pNode->key;
|
||||||
char* p = pNode->data;
|
char *p = pNode->data;
|
||||||
|
|
||||||
// int32_t ref = T_REF_VAL_GET(pNode);
|
// int32_t ref = T_REF_VAL_GET(pNode);
|
||||||
//
|
//
|
||||||
// if (ref == 1 && inTrashcan) {
|
// if (ref == 1 && inTrashcan) {
|
||||||
// // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may be
|
// // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may
|
||||||
// // destroyed by refresh worker if decrease ref count before removing it from linked-list.
|
// be
|
||||||
// assert(pNode->pTNodeHeader->pData == pNode);
|
// // destroyed by refresh worker if decrease ref count before removing it from linked-list.
|
||||||
//
|
// assert(pNode->pTNodeHeader->pData == pNode);
|
||||||
// __cache_wr_lock(pCacheObj);
|
//
|
||||||
// doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
|
// __cache_wr_lock(pCacheObj);
|
||||||
// __cache_unlock(pCacheObj);
|
// doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
|
||||||
//
|
// __cache_unlock(pCacheObj);
|
||||||
// ref = T_REF_DEC(pNode);
|
//
|
||||||
// assert(ref == 0);
|
// ref = T_REF_DEC(pNode);
|
||||||
//
|
// assert(ref == 0);
|
||||||
// doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
|
//
|
||||||
// } else {
|
// doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
|
||||||
// ref = T_REF_DEC(pNode);
|
// } else {
|
||||||
// assert(ref >= 0);
|
// ref = T_REF_DEC(pNode);
|
||||||
// }
|
// assert(ref >= 0);
|
||||||
|
// }
|
||||||
|
|
||||||
int32_t ref = T_REF_DEC(pNode);
|
int32_t ref = T_REF_DEC(pNode);
|
||||||
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
|
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
|
||||||
|
@ -504,21 +510,21 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SHashTravSupp {
|
typedef struct SHashTravSupp {
|
||||||
SCacheObj* pCacheObj;
|
SCacheObj *pCacheObj;
|
||||||
int64_t time;
|
int64_t time;
|
||||||
__cache_trav_fn_t fp;
|
__cache_trav_fn_t fp;
|
||||||
void* param1;
|
void *param1;
|
||||||
} SHashTravSupp;
|
} SHashTravSupp;
|
||||||
|
|
||||||
static bool travHashTableEmptyFn(void* param, void* data) {
|
static bool travHashTableEmptyFn(void *param, void *data) {
|
||||||
SHashTravSupp* ps = (SHashTravSupp*) param;
|
SHashTravSupp *ps = (SHashTravSupp *)param;
|
||||||
SCacheObj* pCacheObj= ps->pCacheObj;
|
SCacheObj *pCacheObj = ps->pCacheObj;
|
||||||
|
|
||||||
SCacheDataNode *pNode = *(SCacheDataNode **) data;
|
SCacheDataNode *pNode = *(SCacheDataNode **)data;
|
||||||
|
|
||||||
if (T_REF_VAL_GET(pNode) == 0) {
|
if (T_REF_VAL_GET(pNode) == 0) {
|
||||||
taosCacheReleaseNode(pCacheObj, pNode);
|
taosCacheReleaseNode(pCacheObj, pNode);
|
||||||
} else { // do add to trashcan
|
} else { // do add to trashcan
|
||||||
taosAddToTrashcan(pCacheObj, pNode);
|
taosAddToTrashcan(pCacheObj, pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -529,7 +535,7 @@ static bool travHashTableEmptyFn(void* param, void* data) {
|
||||||
void taosCacheEmpty(SCacheObj *pCacheObj) {
|
void taosCacheEmpty(SCacheObj *pCacheObj) {
|
||||||
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||||
|
|
||||||
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
||||||
taosTrashcanEmpty(pCacheObj, false);
|
taosTrashcanEmpty(pCacheObj, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -542,9 +548,9 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
|
||||||
|
|
||||||
// wait for the refresh thread quit before destroying the cache object.
|
// wait for the refresh thread quit before destroying the cache object.
|
||||||
// But in the dll, the child thread will be killed before atexit takes effect.
|
// But in the dll, the child thread will be killed before atexit takes effect.
|
||||||
while(atomic_load_8(&pCacheObj->deleting) != 0) {
|
while (atomic_load_8(&pCacheObj->deleting) != 0) {
|
||||||
if (refreshWorkerNormalStopped) break;
|
if (refreshWorkerNormalStopped) break;
|
||||||
if (refreshWorkerUnexpectedStopped) return;
|
if (refreshWorkerUnexpectedStopped) return;
|
||||||
taosMsleep(50);
|
taosMsleep(50);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -568,11 +574,11 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
|
||||||
|
|
||||||
memcpy(pNewNode->key, key, keyLen);
|
memcpy(pNewNode->key, key, keyLen);
|
||||||
|
|
||||||
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
||||||
pNewNode->lifespan = duration;
|
pNewNode->lifespan = duration;
|
||||||
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
|
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
|
||||||
pNewNode->signature = (uint64_t)pNewNode;
|
pNewNode->signature = (uint64_t)pNewNode;
|
||||||
pNewNode->size = (uint32_t)totalSize;
|
pNewNode->size = (uint32_t)totalSize;
|
||||||
|
|
||||||
return pNewNode;
|
return pNewNode;
|
||||||
}
|
}
|
||||||
|
@ -610,16 +616,17 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
if (pCacheObj->numOfElemsInTrash == 0) {
|
if (pCacheObj->numOfElemsInTrash == 0) {
|
||||||
if (pCacheObj->pTrash != NULL) {
|
if (pCacheObj->pTrash != NULL) {
|
||||||
pCacheObj->pTrash = NULL;
|
pCacheObj->pTrash = NULL;
|
||||||
uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
|
uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name,
|
||||||
|
pCacheObj->numOfElemsInTrash);
|
||||||
}
|
}
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
__cache_unlock(pCacheObj);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* stat[] = {"false", "true"};
|
const char *stat[] = {"false", "true"};
|
||||||
uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
|
uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name,
|
||||||
pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0]));
|
pCacheObj->numOfElemsInTrash, (force ? stat[1] : stat[0]));
|
||||||
|
|
||||||
STrashElem *pElem = pCacheObj->pTrash;
|
STrashElem *pElem = pCacheObj->pTrash;
|
||||||
while (pElem) {
|
while (pElem) {
|
||||||
|
@ -627,8 +634,8 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
assert(pElem->next != pElem && pElem->prev != pElem);
|
assert(pElem->next != pElem && pElem->prev != pElem);
|
||||||
|
|
||||||
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
||||||
uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
|
uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key,
|
||||||
pCacheObj->numOfElemsInTrash - 1);
|
pElem->pData->data, pCacheObj->numOfElemsInTrash - 1);
|
||||||
|
|
||||||
doRemoveElemInTrashcan(pCacheObj, pElem);
|
doRemoveElemInTrashcan(pCacheObj, pElem);
|
||||||
doDestroyTrashcanElem(pCacheObj, pElem);
|
doDestroyTrashcanElem(pCacheObj, pElem);
|
||||||
|
@ -642,25 +649,25 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
||||||
// SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
// SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||||
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
||||||
|
|
||||||
// todo memory leak if there are object with refcount greater than 0 in hash table?
|
// todo memory leak if there are object with refcount greater than 0 in hash table?
|
||||||
taosHashCleanup(pCacheObj->pHashTable);
|
taosHashCleanup(pCacheObj->pHashTable);
|
||||||
taosTrashcanEmpty(pCacheObj, true);
|
taosTrashcanEmpty(pCacheObj, true);
|
||||||
|
|
||||||
__cache_lock_destroy(pCacheObj);
|
__cache_lock_destroy(pCacheObj);
|
||||||
|
|
||||||
tfree(pCacheObj->name);
|
tfree(pCacheObj->name);
|
||||||
memset(pCacheObj, 0, sizeof(SCacheObj));
|
memset(pCacheObj, 0, sizeof(SCacheObj));
|
||||||
free(pCacheObj);
|
free(pCacheObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool travHashTableFn(void* param, void* data) {
|
bool travHashTableFn(void *param, void *data) {
|
||||||
SHashTravSupp* ps = (SHashTravSupp*) param;
|
SHashTravSupp *ps = (SHashTravSupp *)param;
|
||||||
SCacheObj* pCacheObj= ps->pCacheObj;
|
SCacheObj *pCacheObj = ps->pCacheObj;
|
||||||
|
|
||||||
SCacheDataNode* pNode = *(SCacheDataNode **) data;
|
SCacheDataNode *pNode = *(SCacheDataNode **)data;
|
||||||
if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
|
if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
|
||||||
taosCacheReleaseNode(pCacheObj, pNode);
|
taosCacheReleaseNode(pCacheObj, pNode);
|
||||||
|
|
||||||
|
@ -676,30 +683,30 @@ bool travHashTableFn(void* param, void* data) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_trav_fn_t fp, void* param1) {
|
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
|
||||||
assert(pCacheObj != NULL);
|
assert(pCacheObj != NULL);
|
||||||
|
|
||||||
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
||||||
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
|
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCacheRefreshWorkerUnexpectedStopped(void) {
|
void taosCacheRefreshWorkerUnexpectedStopped(void) {
|
||||||
if(!refreshWorkerNormalStopped) {
|
if (!refreshWorkerNormalStopped) {
|
||||||
refreshWorkerUnexpectedStopped=true;
|
refreshWorkerUnexpectedStopped = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosCacheTimedRefresh(void *handle) {
|
void *taosCacheTimedRefresh(void *handle) {
|
||||||
assert(pCacheArrayList != NULL);
|
assert(pCacheArrayList != NULL);
|
||||||
uDebug("cache refresh thread starts");
|
uDebug("cache refresh thread starts");
|
||||||
|
|
||||||
setThreadName("cacheRefresh");
|
setThreadName("cacheRefresh");
|
||||||
|
|
||||||
const int32_t SLEEP_DURATION = 500; //500 ms
|
const int32_t SLEEP_DURATION = 500; // 500 ms
|
||||||
int64_t count = 0;
|
int64_t count = 0;
|
||||||
atexit(taosCacheRefreshWorkerUnexpectedStopped);
|
atexit(taosCacheRefreshWorkerUnexpectedStopped);
|
||||||
|
|
||||||
while(1) {
|
while (1) {
|
||||||
taosMsleep(SLEEP_DURATION);
|
taosMsleep(SLEEP_DURATION);
|
||||||
if (stopRefreshWorker) {
|
if (stopRefreshWorker) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -711,9 +718,9 @@ void* taosCacheTimedRefresh(void *handle) {
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
pthread_mutex_lock(&guard);
|
pthread_mutex_lock(&guard);
|
||||||
SCacheObj* pCacheObj = taosArrayGetP(pCacheArrayList, i);
|
SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
|
||||||
|
|
||||||
if (pCacheObj == NULL) {
|
if (pCacheObj == NULL) {
|
||||||
uError("object is destroyed. ignore and try next");
|
uError("object is destroyed. ignore and try next");
|
||||||
|
@ -726,8 +733,8 @@ void* taosCacheTimedRefresh(void *handle) {
|
||||||
taosArrayRemove(pCacheArrayList, i);
|
taosArrayRemove(pCacheArrayList, i);
|
||||||
size = taosArrayGetSize(pCacheArrayList);
|
size = taosArrayGetSize(pCacheArrayList);
|
||||||
|
|
||||||
uDebug("%s is destroying, remove it from refresh list, remain cache obj:%"PRIzu, pCacheObj->name, size);
|
uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
|
||||||
pCacheObj->deleting = 0; //reset the deleting flag to enable pCacheObj to continue releasing resources.
|
pCacheObj->deleting = 0; // reset the deleting flag to enable pCacheObj to continue releasing resources.
|
||||||
|
|
||||||
pthread_mutex_unlock(&guard);
|
pthread_mutex_unlock(&guard);
|
||||||
continue;
|
continue;
|
||||||
|
@ -757,18 +764,18 @@ void* taosCacheTimedRefresh(void *handle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
taosArrayDestroy(pCacheArrayList);
|
taosArrayDestroy(pCacheArrayList);
|
||||||
|
|
||||||
pCacheArrayList = NULL;
|
pCacheArrayList = NULL;
|
||||||
pthread_mutex_destroy(&guard);
|
pthread_mutex_destroy(&guard);
|
||||||
refreshWorkerNormalStopped=true;
|
refreshWorkerNormalStopped = true;
|
||||||
|
|
||||||
uDebug("cache refresh thread quits");
|
uDebug("cache refresh thread quits");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) {
|
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) {
|
||||||
if (pCacheObj == NULL) {
|
if (pCacheObj == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -777,6 +784,4 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1)
|
||||||
doCacheRefresh(pCacheObj, now, fp, param1);
|
doCacheRefresh(pCacheObj, now, fp, param1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosStopCacheRefreshWorker(void) {
|
void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; }
|
||||||
stopRefreshWorker = true;
|
|
||||||
}
|
|
Loading…
Reference in New Issue