[td-225] fix bug in hash
This commit is contained in:
parent
175dfb56a0
commit
321f7b276b
|
@ -329,17 +329,6 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||||
return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0);
|
return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void doPopNextFromEntryList(SHashEntry *pe, SHashNode *pNode) {
|
|
||||||
SHashNode *pNext = pNode->next;
|
|
||||||
if (pNext != NULL) {
|
|
||||||
pNode->next = pNext->next;
|
|
||||||
} else {
|
|
||||||
pNode->next = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pe->num -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) {
|
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) {
|
||||||
if (pHashObj == NULL || pHashObj->size <= 0) {
|
if (pHashObj == NULL || pHashObj->size <= 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -457,7 +446,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
|
||||||
taosWLockLatch(&pEntry->latch);
|
taosWLockLatch(&pEntry->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo remove first node
|
// todo remove the first node
|
||||||
SHashNode *pNode = NULL;
|
SHashNode *pNode = NULL;
|
||||||
while((pNode = pEntry->next) != NULL) {
|
while((pNode = pEntry->next) != NULL) {
|
||||||
if (fp && (!fp(param, pNode->data))) {
|
if (fp && (!fp(param, pNode->data))) {
|
||||||
|
|
|
@ -116,16 +116,6 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
|
||||||
free(pNode);
|
free(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* move the old node into trash
|
|
||||||
* @param pCacheObj
|
|
||||||
* @param pNode
|
|
||||||
*/
|
|
||||||
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
|
||||||
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
|
||||||
taosAddToTrash(pCacheObj, pNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
|
static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
|
||||||
if (pElem->pData->signature != (uint64_t) pElem->pData) {
|
if (pElem->pData->signature != (uint64_t) pElem->pData) {
|
||||||
uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
|
uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
|
||||||
|
@ -152,52 +142,6 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem
|
||||||
free(pElem->pData);
|
free(pElem->pData);
|
||||||
free(pElem);
|
free(pElem);
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* update data in cache
|
|
||||||
* @param pCacheObj
|
|
||||||
* @param pNode
|
|
||||||
* @param key
|
|
||||||
* @param keyLen
|
|
||||||
* @param pData
|
|
||||||
* @param dataSize
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
static UNUSED_FUNC SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode* pNode, SCacheDataNode* pNewNode,
|
|
||||||
const char *key, int32_t keyLen) {
|
|
||||||
|
|
||||||
// only a node is not referenced by any other object, in-place update it
|
|
||||||
if (T_REF_VAL_GET(pNode) > 0) {
|
|
||||||
taosCacheMoveToTrash(pCacheObj, pNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
T_REF_INC(pNewNode);
|
|
||||||
|
|
||||||
// addedTime new element to hashtable
|
|
||||||
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
|
|
||||||
return pNewNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* addedTime data into hash table
|
|
||||||
* @param key
|
|
||||||
* @param pData
|
|
||||||
* @param size
|
|
||||||
* @param pCacheObj
|
|
||||||
* @param keyLen
|
|
||||||
* @param pNode
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, const char *key, size_t keyLen, const void *pData,
|
|
||||||
size_t dataSize, uint64_t duration) {
|
|
||||||
SCacheDataNode *pNode = taosCreateCacheNode(key, keyLen, pData, dataSize, duration);
|
|
||||||
if (pNode == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
T_REF_INC(pNode);
|
|
||||||
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* do cleanup the taos cache
|
* do cleanup the taos cache
|
||||||
|
@ -318,8 +262,8 @@ static void incRefFn(void* ptNode) {
|
||||||
assert(ptNode != NULL);
|
assert(ptNode != NULL);
|
||||||
|
|
||||||
SCacheDataNode** p = (SCacheDataNode**) ptNode;
|
SCacheDataNode** p = (SCacheDataNode**) ptNode;
|
||||||
|
|
||||||
assert(T_REF_VAL_GET(*p) >= 0);
|
assert(T_REF_VAL_GET(*p) >= 0);
|
||||||
|
|
||||||
int32_t ret = T_REF_INC(*p);
|
int32_t ret = T_REF_INC(*p);
|
||||||
assert(ret > 0);
|
assert(ret > 0);
|
||||||
}
|
}
|
||||||
|
@ -344,34 +288,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
||||||
return pData;
|
return pData;
|
||||||
}
|
}
|
||||||
|
|
||||||
//void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
|
|
||||||
// if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// __cache_rd_lock(pCacheObj);
|
|
||||||
//
|
|
||||||
// SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
|
|
||||||
// if (ptNode != NULL) {
|
|
||||||
// T_REF_INC(*ptNode);
|
|
||||||
// (*ptNode)->expireTime = expireTime; // taosGetTimestampMs() + (*ptNode)->lifespan;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// __cache_unlock(pCacheObj);
|
|
||||||
//
|
|
||||||
// if (ptNode != NULL) {
|
|
||||||
// atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
|
||||||
// uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
|
|
||||||
// (*ptNode)->data, T_REF_VAL_GET(*ptNode));
|
|
||||||
// } else {
|
|
||||||
// atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
|
||||||
// uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
|
|
||||||
// return (ptNode != NULL) ? (*ptNode)->data : NULL;
|
|
||||||
//}
|
|
||||||
|
|
||||||
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
if (pCacheObj == NULL || data == NULL) return NULL;
|
if (pCacheObj == NULL || data == NULL) return NULL;
|
||||||
|
|
||||||
|
|
|
@ -259,20 +259,20 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
//TODO handle malloc failure
|
//TODO handle malloc failure
|
||||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
freeHandle = true;
|
||||||
} else { // result is not ready, return immediately
|
} else { // result is not ready, return immediately
|
||||||
if (!buildRes) {
|
if (!buildRes) {
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||||
return TSDB_CODE_QRY_NOT_READY;
|
return TSDB_CODE_QRY_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
void** dup = handle;
|
|
||||||
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle);
|
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle);
|
||||||
|
}
|
||||||
|
|
||||||
// not added into task queue, the query must be completed already, free qhandle immediately
|
// if qhandle is not added into task queue, the query must be completed already or paused with error ,
|
||||||
if (freeHandle) {
|
// free qhandle immediately
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &dup, true);
|
if (freeHandle) {
|
||||||
}
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue