[td-225] fix invalid read when free taosCache object.
This commit is contained in:
parent
d700090c1a
commit
376809ace9
|
@ -143,11 +143,11 @@ void taos_init_imp() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t refreshTime = tsTableMetaKeepTimer;
|
int64_t refreshTime = tsTableMetaKeepTimer;
|
||||||
refreshTime = refreshTime > 2 ? 2 : refreshTime;
|
refreshTime = refreshTime > 10 ? 10 : refreshTime;
|
||||||
refreshTime = refreshTime < 1 ? 1 : refreshTime;
|
refreshTime = refreshTime < 10 ? 10 : refreshTime;
|
||||||
|
|
||||||
if (tscCacheHandle == NULL) {
|
if (tscCacheHandle == NULL) {
|
||||||
tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
|
tscCacheHandle = taosCacheInit(refreshTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscTrace("client is initialized successfully");
|
tscTrace("client is initialized successfully");
|
||||||
|
|
|
@ -67,7 +67,7 @@ int32_t mnodeInitProfile() {
|
||||||
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
|
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
|
||||||
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
|
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
|
||||||
|
|
||||||
tsMnodeConnCache = taosCacheInitWithCb(tsMnodeTmr, CONN_CHECK_TIME, mnodeFreeConn);
|
tsMnodeConnCache = taosCacheInitWithCb(CONN_CHECK_TIME, mnodeFreeConn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
|
||||||
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
|
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
|
||||||
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
|
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
|
||||||
|
|
||||||
tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj);
|
tsMnodeShowCache = taosCacheInitWithCb(10, mnodeFreeShowObj);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,8 +37,8 @@ typedef struct SCacheDataNode {
|
||||||
uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache
|
uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache
|
||||||
uint64_t signature;
|
uint64_t signature;
|
||||||
uint32_t size; // allocated size for current SCacheDataNode
|
uint32_t size; // allocated size for current SCacheDataNode
|
||||||
uint16_t keySize : 15;
|
uint16_t keySize: 15;
|
||||||
bool inTrash : 1; // denote if it is in trash or not
|
bool inTrashCan: 1;// denote if it is in trash or not
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
char *key;
|
char *key;
|
||||||
char data[];
|
char data[];
|
||||||
|
@ -50,46 +50,49 @@ typedef struct STrashElem {
|
||||||
SCacheDataNode *pData;
|
SCacheDataNode *pData;
|
||||||
} STrashElem;
|
} STrashElem;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* to accommodate the old data which has the same key value of new one in hashList
|
||||||
|
* when an new node is put into cache, if an existed one with the same key:
|
||||||
|
* 1. if the old one does not be referenced, update it.
|
||||||
|
* 2. otherwise, move the old one to pTrash, addedTime the new one.
|
||||||
|
*
|
||||||
|
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
||||||
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
|
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
|
||||||
int64_t refreshTime;
|
int64_t refreshTime;
|
||||||
|
STrashElem * pTrash;
|
||||||
/*
|
void * tmrCtrl;
|
||||||
* to accommodate the old datanode which has the same key value of new one in hashList
|
void * pTimer;
|
||||||
* when an new node is put into cache, if an existed one with the same key:
|
SCacheStatis statistics;
|
||||||
* 1. if the old one does not be referenced, update it.
|
SHashObj * pHashTable;
|
||||||
* 2. otherwise, move the old one to pTrash, addedTime the new one.
|
|
||||||
*
|
|
||||||
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
|
||||||
*/
|
|
||||||
STrashElem * pTrash;
|
|
||||||
void * tmrCtrl;
|
|
||||||
void * pTimer;
|
|
||||||
SCacheStatis statistics;
|
|
||||||
SHashObj * pHashTable;
|
|
||||||
_hash_free_fn_t freeFp;
|
_hash_free_fn_t freeFp;
|
||||||
int numOfElemsInTrash; // number of element in trash
|
uint32_t numOfElemsInTrash; // number of element in trash
|
||||||
int16_t deleting; // set the deleting flag to stop refreshing ASAP.
|
uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
|
||||||
T_REF_DECLARE()
|
pthread_t refreshWorker;
|
||||||
|
|
||||||
#if defined(LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_t lock;
|
pthread_rwlock_t lock;
|
||||||
#else
|
#else
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
} SCacheObj;
|
} SCacheObj;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
* initialize the cache object
|
||||||
* @param maxSessions maximum slots available for hash elements
|
|
||||||
* @param tmrCtrl timer ctrl
|
|
||||||
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
|
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
|
||||||
* not referenced by other objects
|
* not referenced by other objects
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds);
|
SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds);
|
||||||
SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTimeInSeconds, void (*freeCb)(void *data));
|
|
||||||
|
/**
|
||||||
|
* initialize the cache object and set the free object callback function
|
||||||
|
* @param refreshTimeInSeconds
|
||||||
|
* @param freeCb
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void *data));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add data into cache
|
* add data into cache
|
||||||
|
|
|
@ -77,31 +77,7 @@ static FORCE_INLINE void taosFreeNode(void *data) {
|
||||||
* @param lifespan total survial expiredTime from now
|
* @param lifespan total survial expiredTime from now
|
||||||
* @return SCacheDataNode
|
* @return SCacheDataNode
|
||||||
*/
|
*/
|
||||||
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration);
|
||||||
uint64_t duration) {
|
|
||||||
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1;
|
|
||||||
|
|
||||||
SCacheDataNode *pNewNode = calloc(1, totalSize);
|
|
||||||
if (pNewNode == NULL) {
|
|
||||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pNewNode->data, pData, size);
|
|
||||||
|
|
||||||
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
|
|
||||||
pNewNode->keySize = keyLen;
|
|
||||||
|
|
||||||
memcpy(pNewNode->key, key, keyLen);
|
|
||||||
|
|
||||||
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
|
||||||
pNewNode->expiredTime = pNewNode->addedTime + duration;
|
|
||||||
|
|
||||||
pNewNode->signature = (uint64_t)pNewNode;
|
|
||||||
pNewNode->size = (uint32_t)totalSize;
|
|
||||||
|
|
||||||
return pNewNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
|
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
|
||||||
|
@ -109,50 +85,15 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const
|
||||||
* @param pCacheObj Cache object
|
* @param pCacheObj Cache object
|
||||||
* @param pNode Cache slot object
|
* @param pNode Cache slot object
|
||||||
*/
|
*/
|
||||||
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);
|
||||||
if (pNode->inTrash) { /* node is already in trash */
|
|
||||||
return;
|
/**
|
||||||
}
|
* remove node in trash can
|
||||||
|
* @param pCacheObj
|
||||||
STrashElem *pElem = calloc(1, sizeof(STrashElem));
|
* @param pElem
|
||||||
pElem->pData = pNode;
|
*/
|
||||||
|
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
|
||||||
pElem->next = pCacheObj->pTrash;
|
|
||||||
if (pCacheObj->pTrash) {
|
|
||||||
pCacheObj->pTrash->prev = pElem;
|
|
||||||
}
|
|
||||||
|
|
||||||
pElem->prev = NULL;
|
|
||||||
pCacheObj->pTrash = pElem;
|
|
||||||
|
|
||||||
pNode->inTrash = true;
|
|
||||||
pCacheObj->numOfElemsInTrash++;
|
|
||||||
|
|
||||||
uTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
|
|
||||||
if (pElem->pData->signature != (uint64_t)pElem->pData) {
|
|
||||||
uError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCacheObj->numOfElemsInTrash--;
|
|
||||||
if (pElem->prev) {
|
|
||||||
pElem->prev->next = pElem->next;
|
|
||||||
} else { /* pnode is the header, update header */
|
|
||||||
pCacheObj->pTrash = pElem->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pElem->next) {
|
|
||||||
pElem->next->prev = pElem->prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
pElem->pData->signature = 0;
|
|
||||||
if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data);
|
|
||||||
free(pElem->pData);
|
|
||||||
free(pElem);
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* remove nodes in trash with refCount == 0 in cache
|
* remove nodes in trash with refCount == 0 in cache
|
||||||
* @param pNode
|
* @param pNode
|
||||||
|
@ -160,42 +101,7 @@ static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
|
||||||
* @param force force model, if true, remove data in trash without check refcount.
|
* @param force force model, if true, remove data in trash without check refcount.
|
||||||
* may cause corruption. So, forece model only applys before cache is closed
|
* may cause corruption. So, forece model only applys before cache is closed
|
||||||
*/
|
*/
|
||||||
static void taosTrashEmpty(SCacheObj *pCacheObj, bool force) {
|
static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
|
||||||
__cache_wr_lock(pCacheObj);
|
|
||||||
|
|
||||||
if (pCacheObj->numOfElemsInTrash == 0) {
|
|
||||||
if (pCacheObj->pTrash != NULL) {
|
|
||||||
uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
|
|
||||||
}
|
|
||||||
pCacheObj->pTrash = NULL;
|
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
STrashElem *pElem = pCacheObj->pTrash;
|
|
||||||
|
|
||||||
while (pElem) {
|
|
||||||
T_REF_VAL_CHECK(pElem->pData);
|
|
||||||
if (pElem->next == pElem) {
|
|
||||||
pElem->next = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
|
||||||
uTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
|
|
||||||
pCacheObj->numOfElemsInTrash - 1);
|
|
||||||
STrashElem *p = pElem;
|
|
||||||
|
|
||||||
pElem = pElem->next;
|
|
||||||
taosRemoveFromTrash(pCacheObj, p);
|
|
||||||
} else {
|
|
||||||
pElem = pElem->next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(pCacheObj->numOfElemsInTrash >= 0);
|
|
||||||
__cache_unlock(pCacheObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* release node
|
* release node
|
||||||
|
@ -304,87 +210,20 @@ static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, con
|
||||||
return pNode;
|
return pNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCleanupDataCache(SCacheObj *pCacheObj) {
|
/**
|
||||||
__cache_wr_lock(pCacheObj);
|
* do cleanup the taos cache
|
||||||
|
* @param pCacheObj
|
||||||
//if (taosHashGetSize(pCacheObj->pHashTable) > 0) {
|
*/
|
||||||
taosHashCleanup(pCacheObj->pHashTable);
|
static void doCleanupDataCache(SCacheObj *pCacheObj);
|
||||||
//}
|
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
|
||||||
|
|
||||||
taosTrashEmpty(pCacheObj, true);
|
|
||||||
__cache_lock_destroy(pCacheObj);
|
|
||||||
|
|
||||||
memset(pCacheObj, 0, sizeof(SCacheObj));
|
|
||||||
free(pCacheObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
|
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
|
||||||
* @param handle Cache object handle
|
* @param handle Cache object handle
|
||||||
*/
|
*/
|
||||||
static void taosCacheRefresh(void *handle, void *tmrId) {
|
static void* taosCacheRefresh(void *handle);
|
||||||
SCacheObj *pCacheObj = (SCacheObj *)handle;
|
|
||||||
|
|
||||||
if (pCacheObj == NULL || T_REF_VAL_GET(pCacheObj) == 0) {
|
|
||||||
uTrace("object is destroyed. no refresh retry");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int16_t ref = T_REF_INC(pCacheObj);
|
|
||||||
if (ref == 1) {
|
|
||||||
T_REF_DEC(pCacheObj);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo add the ref before start the timer
|
|
||||||
int32_t num = taosHashGetSize(pCacheObj->pHashTable);
|
|
||||||
if (num == 0) {
|
|
||||||
ref = T_REF_DEC(pCacheObj);
|
|
||||||
if (ref == 0) {
|
|
||||||
doCleanupDataCache(pCacheObj);
|
|
||||||
} else {
|
|
||||||
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t expiredTime = taosGetTimestampMs();
|
|
||||||
pCacheObj->statistics.refreshCount++;
|
|
||||||
|
|
||||||
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
|
|
||||||
|
|
||||||
__cache_wr_lock(pCacheObj);
|
|
||||||
while (taosHashIterNext(pIter)) {
|
|
||||||
if (pCacheObj->deleting == 1) {
|
|
||||||
taosHashDestroyIter(pIter);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
|
|
||||||
if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
|
|
||||||
taosCacheReleaseNode(pCacheObj, pNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
|
||||||
|
|
||||||
taosHashDestroyIter(pIter);
|
|
||||||
|
|
||||||
taosTrashEmpty(pCacheObj, false);
|
SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) {
|
||||||
|
if (refreshTime <= 0) {
|
||||||
ref = T_REF_DEC(pCacheObj);
|
|
||||||
if (ref == 0) {
|
|
||||||
doCleanupDataCache(pCacheObj);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb)(void *data)) {
|
|
||||||
if (tmrCtrl == NULL || refreshTime <= 0) {
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,7 +233,7 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCacheObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
|
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
|
||||||
if (pCacheObj->pHashTable == NULL) {
|
if (pCacheObj->pHashTable == NULL) {
|
||||||
free(pCacheObj);
|
free(pCacheObj);
|
||||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
|
@ -406,25 +245,27 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
|
||||||
|
|
||||||
pCacheObj->freeFp = freeCb;
|
pCacheObj->freeFp = freeCb;
|
||||||
pCacheObj->refreshTime = refreshTime * 1000;
|
pCacheObj->refreshTime = refreshTime * 1000;
|
||||||
pCacheObj->tmrCtrl = tmrCtrl;
|
|
||||||
|
|
||||||
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
|
|
||||||
|
|
||||||
if (__cache_lock_init(pCacheObj) != 0) {
|
if (__cache_lock_init(pCacheObj) != 0) {
|
||||||
taosTmrStopA(&pCacheObj->pTimer);
|
|
||||||
taosHashCleanup(pCacheObj->pHashTable);
|
taosHashCleanup(pCacheObj->pHashTable);
|
||||||
free(pCacheObj);
|
free(pCacheObj);
|
||||||
|
|
||||||
uError("failed to init lock, reason:%s", strerror(errno));
|
uError("failed to init lock, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
T_REF_INC(pCacheObj);
|
pthread_attr_t thattr = {0};
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheRefresh, pCacheObj);
|
||||||
|
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
return pCacheObj;
|
return pCacheObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
|
SCacheObj *taosCacheInit(int64_t refreshTime) {
|
||||||
return taosCacheInitWithCb(tmrCtrl, refreshTime, NULL);
|
return taosCacheInitWithCb(refreshTime, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) {
|
void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) {
|
||||||
|
@ -600,16 +441,188 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
|
||||||
__cache_unlock(pCacheObj);
|
__cache_unlock(pCacheObj);
|
||||||
|
|
||||||
taosHashDestroyIter(pIter);
|
taosHashDestroyIter(pIter);
|
||||||
taosTrashEmpty(pCacheObj, false);
|
taosTrashCanEmpty(pCacheObj, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCacheCleanup(SCacheObj *pCacheObj) {
|
void taosCacheCleanup(SCacheObj *pCacheObj) {
|
||||||
if (pCacheObj == NULL) {
|
if (pCacheObj == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ref = T_REF_DEC(pCacheObj);
|
pCacheObj->deleting = 1;
|
||||||
if (ref == 0) {
|
pthread_join(pCacheObj->refreshWorker, NULL);
|
||||||
doCleanupDataCache(pCacheObj);
|
|
||||||
}
|
doCleanupDataCache(pCacheObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
||||||
|
uint64_t duration) {
|
||||||
|
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1;
|
||||||
|
|
||||||
|
SCacheDataNode *pNewNode = calloc(1, totalSize);
|
||||||
|
if (pNewNode == NULL) {
|
||||||
|
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pNewNode->data, pData, size);
|
||||||
|
|
||||||
|
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
|
||||||
|
pNewNode->keySize = keyLen;
|
||||||
|
|
||||||
|
memcpy(pNewNode->key, key, keyLen);
|
||||||
|
|
||||||
|
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
||||||
|
pNewNode->expiredTime = pNewNode->addedTime + duration;
|
||||||
|
|
||||||
|
pNewNode->signature = (uint64_t)pNewNode;
|
||||||
|
pNewNode->size = (uint32_t)totalSize;
|
||||||
|
|
||||||
|
return pNewNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
||||||
|
if (pNode->inTrashCan) { /* node is already in trash */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STrashElem *pElem = calloc(1, sizeof(STrashElem));
|
||||||
|
pElem->pData = pNode;
|
||||||
|
|
||||||
|
pElem->next = pCacheObj->pTrash;
|
||||||
|
if (pCacheObj->pTrash) {
|
||||||
|
pCacheObj->pTrash->prev = pElem;
|
||||||
|
}
|
||||||
|
|
||||||
|
pElem->prev = NULL;
|
||||||
|
pCacheObj->pTrash = pElem;
|
||||||
|
|
||||||
|
pNode->inTrashCan = true;
|
||||||
|
pCacheObj->numOfElemsInTrash++;
|
||||||
|
|
||||||
|
uTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
|
||||||
|
if (pElem->pData->signature != (uint64_t)pElem->pData) {
|
||||||
|
uError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCacheObj->numOfElemsInTrash--;
|
||||||
|
if (pElem->prev) {
|
||||||
|
pElem->prev->next = pElem->next;
|
||||||
|
} else { /* pnode is the header, update header */
|
||||||
|
pCacheObj->pTrash = pElem->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pElem->next) {
|
||||||
|
pElem->next->prev = pElem->prev;
|
||||||
|
}
|
||||||
|
|
||||||
|
pElem->pData->signature = 0;
|
||||||
|
if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data);
|
||||||
|
free(pElem->pData);
|
||||||
|
free(pElem);
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
|
__cache_wr_lock(pCacheObj);
|
||||||
|
|
||||||
|
if (pCacheObj->numOfElemsInTrash == 0) {
|
||||||
|
if (pCacheObj->pTrash != NULL) {
|
||||||
|
uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
|
||||||
|
}
|
||||||
|
pCacheObj->pTrash = NULL;
|
||||||
|
|
||||||
|
__cache_unlock(pCacheObj);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STrashElem *pElem = pCacheObj->pTrash;
|
||||||
|
|
||||||
|
while (pElem) {
|
||||||
|
T_REF_VAL_CHECK(pElem->pData);
|
||||||
|
if (pElem->next == pElem) {
|
||||||
|
pElem->next = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
||||||
|
uTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
|
||||||
|
pCacheObj->numOfElemsInTrash - 1);
|
||||||
|
STrashElem *p = pElem;
|
||||||
|
|
||||||
|
pElem = pElem->next;
|
||||||
|
taosRemoveFromTrashCan(pCacheObj, p);
|
||||||
|
} else {
|
||||||
|
pElem = pElem->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(pCacheObj->numOfElemsInTrash >= 0);
|
||||||
|
__cache_unlock(pCacheObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
||||||
|
__cache_wr_lock(pCacheObj);
|
||||||
|
taosHashCleanup(pCacheObj->pHashTable);
|
||||||
|
__cache_unlock(pCacheObj);
|
||||||
|
|
||||||
|
taosTrashCanEmpty(pCacheObj, true);
|
||||||
|
__cache_lock_destroy(pCacheObj);
|
||||||
|
|
||||||
|
memset(pCacheObj, 0, sizeof(SCacheObj));
|
||||||
|
free(pCacheObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* taosCacheRefresh(void *handle) {
|
||||||
|
SCacheObj *pCacheObj = (SCacheObj *)handle;
|
||||||
|
if (pCacheObj == NULL) {
|
||||||
|
uTrace("object is destroyed. no refresh retry");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int32_t SLEEP_DURATION = 500; //500 ms
|
||||||
|
int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION;
|
||||||
|
|
||||||
|
int64_t count = 0;
|
||||||
|
while(1) {
|
||||||
|
taosMsleep(500);
|
||||||
|
|
||||||
|
// check if current cache object will be deleted every 500ms.
|
||||||
|
if (pCacheObj->deleting) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++count < totalTick) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset the count value
|
||||||
|
count = 0;
|
||||||
|
size_t num = taosHashGetSize(pCacheObj->pHashTable);
|
||||||
|
if (num == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t expiredTime = taosGetTimestampMs();
|
||||||
|
pCacheObj->statistics.refreshCount++;
|
||||||
|
|
||||||
|
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
|
||||||
|
|
||||||
|
__cache_wr_lock(pCacheObj);
|
||||||
|
while (taosHashIterNext(pIter)) {
|
||||||
|
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
|
||||||
|
if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
|
||||||
|
taosCacheReleaseNode(pCacheObj, pNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
__cache_unlock(pCacheObj);
|
||||||
|
|
||||||
|
taosHashDestroyIter(pIter);
|
||||||
|
taosTrashCanEmpty(pCacheObj, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
Loading…
Reference in New Issue