This commit is contained in:
Haojun Liao 2020-08-01 13:41:24 +08:00
parent 07726783e1
commit adcbb40fbe
3 changed files with 299 additions and 192 deletions

View File

@ -76,8 +76,9 @@ typedef struct SHashMutableIterator {
SHashObj *pHashObj; SHashObj *pHashObj;
int32_t entryIndex; int32_t entryIndex;
SHashNode *pCur; SHashNode *pCur;
SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current
int32_t num; // already check number of elements in hash table size_t numOfChecked; // already check number of elements in hash table
size_t numOfEntries; // number of entries while the iterator is created
} SHashMutableIterator; } SHashMutableIterator;
/** /**
@ -118,6 +119,8 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
*/ */
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void(*fp)(void*));
/** /**
* remove item with the specified key * remove item with the specified key
* @param pHashObj * @param pHashObj
@ -126,8 +129,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
*/ */
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen); int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize);
int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize); int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
/** /**
* clean up hash table * clean up hash table

View File

@ -19,7 +19,7 @@
#include "tulog.h" #include "tulog.h"
#include "tutil.h" #include "tutil.h"
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) #define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
#define FREE_HASH_NODE(_n) \ #define FREE_HASH_NODE(_n) \
do { \ do { \
@ -75,8 +75,8 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
* @return * @return
*/ */
static FORCE_INLINE SHashNode* doSearchEntryList(SHashEntry* pe, const void* key, size_t keyLen, uint32_t hashVal) { static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
SHashNode* pNode = pe->head.next; SHashNode *pNode = pe->head.next;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
@ -89,7 +89,8 @@ static FORCE_INLINE SHashNode* doSearchEntryList(SHashEntry* pe, const void* key
return pNode; return pNode;
} }
static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t hashVal) { static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen,
uint32_t hashVal) {
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
@ -103,7 +104,7 @@ static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const
taosRLockLatch(&pe->latch); taosRLockLatch(&pe->latch);
} }
SHashNode* pNode = doSearchEntryList(pe, key, keyLen, hashVal); SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal);
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch); taosRUnLockLatch(&pe->latch);
@ -141,8 +142,8 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
*/ */
static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNewNode) { static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNewNode) {
assert(pNode->keyLen == pNewNode->keyLen); assert(pNode->keyLen == pNewNode->keyLen);
SWAP(pNode->key, pNewNode->key, void*); SWAP(pNode->key, pNewNode->key, void *);
SWAP(pNode->data, pNewNode->data, void*); SWAP(pNode->data, pNewNode->data, void *);
return pNewNode; return pNewNode;
} }
@ -153,7 +154,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNe
* @param pHashObj * @param pHashObj
* @param pNode * @param pNode
*/ */
static void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode); static void pushfrontNode(SHashEntry *pEntry, SHashNode *pNode);
/** /**
* Get the next element in hash table for iterator * Get the next element in hash table for iterator
@ -181,17 +182,16 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
pHashObj->type = type; pHashObj->type = type;
pHashObj->enableUpdate = update; pHashObj->enableUpdate = update;
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void*)); pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void *));
if (pHashObj->hashList == NULL) { if (pHashObj->hashList == NULL) {
free(pHashObj); free(pHashObj);
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL; return NULL;
} else { } else {
pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *));
pHashObj->pMemBlock = taosArrayInit(8, sizeof(void*)); void *p = calloc(pHashObj->capacity, sizeof(SHashEntry));
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
void* p = calloc(pHashObj->capacity, sizeof(SHashEntry));
for(int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = p + i * sizeof(SHashEntry); pHashObj->hashList[i] = p + i * sizeof(SHashEntry);
} }
@ -201,9 +201,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
return pHashObj; return pHashObj;
} }
size_t taosHashGetSize(const SHashObj *pHashObj) { size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; }
return (pHashObj == NULL)? 0:pHashObj->size;
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
@ -221,14 +219,14 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
__rd_lock(&pHashObj->lock, pHashObj->type); __rd_lock(&pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch); taosWLockLatch(&pe->latch);
} }
SHashNode* pNode = pe->head.next; SHashNode *pNode = pe->head.next;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
@ -265,11 +263,15 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
__rd_unlock(&pHashObj->lock, pHashObj->type); __rd_unlock(&pHashObj->lock, pHashObj->type);
FREE_HASH_NODE(pNewNode); FREE_HASH_NODE(pNewNode);
return pHashObj->enableUpdate? 0:-1; return pHashObj->enableUpdate ? 0 : -1;
} }
} }
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetCB(pHashObj, key, keyLen, NULL);
}
void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *)) {
if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) { if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) {
return NULL; return NULL;
} }
@ -279,24 +281,42 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
// only add the read lock to disable the resize process // only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type); __rd_lock(&pHashObj->lock, pHashObj->type);
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
__rd_unlock(&pHashObj->lock, pHashObj->type); // no data, return directly
if (atomic_load_32(&pe->num) == 0) {
if (pNode) { __rd_unlock(&pHashObj->lock, pHashObj->type);
assert(pNode->hashVal == hashVal);
return pNode->data;
} else {
return NULL; return NULL;
} }
char *data = NULL;
// lock entry
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal);
if (fp != NULL) {
fp(pNode->data);
}
data = pNode->data;
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return data;
} }
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashRemoveNode(pHashObj, key, keyLen, NULL, 0); return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0);
} }
static FORCE_INLINE void popNodeFromEntryList(SHashEntry* pe, SHashNode* pNode) { static FORCE_INLINE void doPopFromEntryList(SHashEntry *pe, SHashNode *pNode) {
SHashNode* pNext = pNode->next; SHashNode *pNext = pNode->next;
assert(pNode->prev != NULL); assert(pNode->prev != NULL);
pNode->prev->next = pNext; pNode->prev->next = pNext;
@ -307,17 +327,17 @@ static FORCE_INLINE void popNodeFromEntryList(SHashEntry* pe, SHashNode* pNode)
pe->num -= 1; pe->num -= 1;
} }
int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize) { int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) {
if (pHashObj->size <= 0) { if (pHashObj == NULL || pHashObj->size <= 0) {
return -1; return -1;
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
// disable the resize process // disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type); __rd_lock(&pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly // no data, return directly
@ -330,9 +350,9 @@ int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, v
taosWLockLatch(&pe->latch); taosWLockLatch(&pe->latch);
} }
SHashNode* pNode = doSearchEntryList(pe, key, keyLen, hashVal); SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal);
if (pNode != NULL) { if (pNode != NULL) {
popNodeFromEntryList(pe, pNode); doPopFromEntryList(pe, pNode);
} }
if (pHashObj->type == HASH_ENTRY_LOCK) { if (pHashObj->type == HASH_ENTRY_LOCK) {
@ -341,13 +361,13 @@ int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, v
__rd_unlock(&pHashObj->lock, pHashObj->type); __rd_unlock(&pHashObj->lock, pHashObj->type);
atomic_sub_fetch_64(&pHashObj->size, 1);
if (data != NULL) { if (data != NULL) {
memcpy(data, pNode->data, dsize); memcpy(data, pNode->data, dsize);
} }
if (pNode != NULL) { if (pNode != NULL) {
atomic_sub_fetch_64(&pHashObj->size, 1);
pNode->next = NULL; pNode->next = NULL;
pNode->prev = NULL; pNode->prev = NULL;
@ -359,6 +379,49 @@ int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, v
} }
} }
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) {
if (pHashObj == NULL || pHashObj->size == 0) {
return 0;
}
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
int32_t numOfEntries = pHashObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num <= 0) {
continue;
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWLockLatch(&pEntry->latch);
}
SHashNode *pNode = pEntry->head.next;
assert(pNode != NULL);
SHashNode *pNext = NULL;
while (pNode != NULL) {
pNext = pNode->next;
// not qualified, remove it
if (fp && (!fp(param, pNode->data))) {
doPopFromEntryList(pEntry, pNode);
}
pNode = pNext;
}
if (pHashObj->type == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pEntry->latch);
}
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
return 0;
}
void taosHashCleanup(SHashObj *pHashObj) { void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL) { if (pHashObj == NULL) {
return; return;
@ -370,7 +433,7 @@ void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj->hashList) { if (pHashObj->hashList) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry* pEntry = pHashObj->hashList[i]; SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) { if (pEntry->num == 0) {
assert(pEntry->head.next == 0); assert(pEntry->head.next == 0);
continue; continue;
@ -396,8 +459,8 @@ void taosHashCleanup(SHashObj *pHashObj) {
// destroy mem block // destroy mem block
size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock); size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock);
for(int32_t i = 0; i < memBlock; ++i) { for (int32_t i = 0; i < memBlock; ++i) {
void* p = taosArrayGetP(pHashObj->pMemBlock, i); void *p = taosArrayGetP(pHashObj->pMemBlock, i);
tfree(p); tfree(p);
} }
@ -414,6 +477,9 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
} }
pIter->pHashObj = pHashObj; pIter->pHashObj = pHashObj;
// keep it in local variable, in case the resize operation expand the size
pIter->numOfEntries = pHashObj->capacity;
return pIter; return pIter;
} }
@ -428,7 +494,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
} }
// check the first one // check the first one
if (pIter->num == 0) { if (pIter->numOfChecked == 0) {
assert(pIter->pCur == NULL && pIter->pNext == NULL); assert(pIter->pCur == NULL && pIter->pNext == NULL);
while (1) { while (1) {
@ -438,18 +504,30 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
continue; continue;
} }
if (pIter->pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pEntry->latch);
}
pIter->pCur = pEntry->head.next; pIter->pCur = pEntry->head.next;
if (pIter->pCur->next) { if (pIter->pCur->next) {
pIter->pNext = pIter->pCur->next; pIter->pNext = pIter->pCur->next;
if (pIter->pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pEntry->latch);
}
} else { } else {
if (pIter->pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pEntry->latch);
}
pIter->pNext = getNextHashNode(pIter); pIter->pNext = getNextHashNode(pIter);
} }
break; break;
} }
pIter->num++; pIter->numOfChecked++;
return true; return true;
} else { } else {
assert(pIter->pCur != NULL); assert(pIter->pCur != NULL);
@ -459,7 +537,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
return false; return false;
} }
pIter->num++; pIter->numOfChecked++;
if (pIter->pCur->next) { if (pIter->pCur->next) {
pIter->pNext = pIter->pCur->next; pIter->pNext = pIter->pCur->next;
@ -504,30 +582,30 @@ void taosHashTableResize(SHashObj *pHashObj) {
if (!HASH_NEED_RESIZE(pHashObj)) { if (!HASH_NEED_RESIZE(pHashObj)) {
return; return;
} }
// double the original capacity // double the original capacity
SHashNode *pNode = NULL; SHashNode *pNode = NULL;
SHashNode *pNext = NULL; SHashNode *pNext = NULL;
int32_t newSize = pHashObj->capacity << 1u; int32_t newSize = pHashObj->capacity << 1u;
if (newSize > HASH_MAX_CAPACITY) { if (newSize > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", // uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY); // pHashObj->capacity, HASH_MAX_CAPACITY);
return; return;
} }
void *pNewEntryList = realloc(pHashObj->hashList, sizeof(SHashEntry) * newSize); void *pNewEntryList = realloc(pHashObj->hashList, sizeof(SHashEntry) * newSize);
if (pNewEntryList == NULL) {// todo handle error if (pNewEntryList == NULL) { // todo handle error
// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return; return;
} }
pHashObj->hashList = pNewEntryList; pHashObj->hashList = pNewEntryList;
size_t inc = newSize - pHashObj->capacity; size_t inc = newSize - pHashObj->capacity;
void* p = calloc(inc, sizeof(SHashEntry)); void * p = calloc(inc, sizeof(SHashEntry));
for(int32_t i = 0; i < inc; ++i) { for (int32_t i = 0; i < inc; ++i) {
pHashObj->hashList[i + pHashObj->capacity] = p + i * sizeof(SHashEntry); pHashObj->hashList[i + pHashObj->capacity] = p + i * sizeof(SHashEntry);
} }
@ -535,7 +613,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
pHashObj->capacity = newSize; pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry* pe = pHashObj->hashList[i]; SHashEntry *pe = pHashObj->hashList[i];
if (pe->num == 0) { if (pe->num == 0) {
assert(pe->head.next == NULL); assert(pe->head.next == NULL);
continue; continue;
@ -550,7 +628,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
pNext = pNode->next; pNext = pNode->next;
assert(pNode != pNext && (pNext == NULL || pNext->prev == pNode) && pNode->prev->next == pNode); assert(pNode != pNext && (pNext == NULL || pNext->prev == pNode) && pNode->prev->next == pNode);
popNodeFromEntryList(pe, pNode); doPopFromEntryList(pe, pNode);
// clear pointer // clear pointer
pNode->next = NULL; pNode->next = NULL;
@ -566,8 +644,8 @@ void taosHashTableResize(SHashObj *pHashObj) {
} }
} }
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, // uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); // ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
} }
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
@ -579,7 +657,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode->data = malloc(dsize + keyLen); pNewNode->data = malloc(dsize + keyLen);
memcpy(pNewNode->data, pData, dsize); memcpy(pNewNode->data, pData, dsize);
pNewNode->key = pNewNode->data + dsize; pNewNode->key = pNewNode->data + dsize;
memcpy(pNewNode->key, key, keyLen); memcpy(pNewNode->key, key, keyLen);
@ -588,10 +666,10 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
return pNewNode; return pNewNode;
} }
void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode) { void pushfrontNode(SHashEntry *pEntry, SHashNode *pNode) {
assert(pNode != NULL && pEntry != NULL); assert(pNode != NULL && pEntry != NULL);
SHashNode* pNext = pEntry->head.next; SHashNode *pNext = pEntry->head.next;
if (pNext != NULL) { if (pNext != NULL) {
pNext->prev = pNode; pNext->prev = pNode;
} }
@ -605,17 +683,29 @@ void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode) {
SHashNode *getNextHashNode(SHashMutableIterator *pIter) { SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
assert(pIter != NULL); assert(pIter != NULL);
pIter->entryIndex++; pIter->entryIndex++;
while (pIter->entryIndex < pIter->pHashObj->capacity) { SHashNode *p = NULL;
SHashEntry*pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
while (pIter->entryIndex < pIter->numOfEntries) {
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
if (pEntry->num == 0) { if (pEntry->num == 0) {
pIter->entryIndex++; pIter->entryIndex++;
continue; continue;
} }
return pEntry->head.next; if (pIter->pHashObj->type == HASH_ENTRY_LOCK) {
taosRLockLatch(&pEntry->latch);
}
p = pEntry->head.next;
if (pIter->pHashObj->type == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pEntry->latch);
}
return p;
} }
return NULL; return NULL;
} }

View File

@ -63,13 +63,6 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#endif #endif
} }
#if 0
static FORCE_INLINE void taosFreeNode(void *data) {
SCacheDataNode *pNode = *(SCacheDataNode **)data;
free(pNode);
}
#endif
/** /**
* @param key key of object for hash, usually a null-terminated string * @param key key of object for hash, usually a null-terminated string
* @param keyLen length of key * @param keyLen length of key
@ -89,13 +82,6 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const
*/ */
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode);
/**
* remove node in trash can
* @param pCacheObj
* @param pElem
*/
static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem);
/** /**
* remove nodes in trash with refCount == 0 in cache * remove nodes in trash with refCount == 0 in cache
* @param pNode * @param pNode
@ -113,17 +99,19 @@ static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force);
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) { static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->signature != (uint64_t)pNode) { if (pNode->signature != (uint64_t)pNode) {
uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode); uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
assert(0);
return; return;
} }
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
pCacheObj->totalSize -= pNode->size; pCacheObj->totalSize -= pNode->size;
int32_t size = taosHashGetSize(pCacheObj->pHashTable);
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, pCacheObj->name, pNode->key, pNode->data, size, pCacheObj->totalSize, pNode->size);
pNode->size);
if (pCacheObj->freeFp) {
pCacheObj->freeFp(pNode->data);
}
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode); free(pNode);
} }
@ -137,6 +125,32 @@ static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNo
taosAddToTrash(pCacheObj, pNode); taosAddToTrash(pCacheObj, pNode);
} }
static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t) pElem->pData) {
uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
pCacheObj->numOfElemsInTrash--;
if (pElem->prev) {
pElem->prev->next = pElem->next;
} else { // pnode is the header, update header
pCacheObj->pTrash = pElem->next;
}
if (pElem->next) {
pElem->next->prev = pElem->prev;
}
}
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
if (pCacheObj->freeFp) {
pCacheObj->freeFp(pElem->pData->data);
}
free(pElem->pData);
free(pElem);
}
/** /**
* update data in cache * update data in cache
* @param pCacheObj * @param pCacheObj
@ -261,12 +275,11 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
} else { // duplicated key exists } else { // duplicated key exists
while (1) { while (1) {
SCacheDataNode* p = NULL; SCacheDataNode* p = NULL;
int32_t ret = taosHashRemoveNode(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*)); int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
// add to trashcan // add to trashcan
if (ret == 0) { if (ret == 0) {
if (T_REF_VAL_GET(p) == 0) { if (T_REF_VAL_GET(p) == 0) {
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
pCacheObj->freeFp(p->data); pCacheObj->freeFp(p->data);
} }
@ -300,27 +313,25 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
return pNode1->data; return pNode1->data;
} }
static void incRefFn(void* ptNode) {
assert(ptNode != NULL);
SCacheDataNode** p = (SCacheDataNode**) ptNode;
int32_t ret = T_REF_INC(*p);
assert(ret > 0);
}
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL; return NULL;
} }
void *pData = NULL; SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn);
void* pData = (ptNode != NULL)? (*ptNode)->data:NULL;
// __cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
int32_t ref = 0;
if (ptNode != NULL) {
ref = T_REF_INC(*ptNode);
pData = (*ptNode)->data;
}
// __cache_unlock(pCacheObj);
if (pData != NULL) { if (pData != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(*ptNode));
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
@ -423,8 +434,11 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (_remove) { if (_remove) {
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately. // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
char* key = pNode->key;
char* d = pNode->data;
int32_t ref = T_REF_DEC(pNode); int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref);
/* /*
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
@ -437,24 +451,35 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (ref == 0) { if (ref == 0) {
assert(pNode->pTNodeHeader->pData == pNode); assert(pNode->pTNodeHeader->pData == pNode);
// todo add lock here __cache_wr_lock(pCacheObj);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
__cache_unlock(pCacheObj);
doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
} }
} else { } else {
int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
if (ret == 0) { // successfully remove from hash table
// successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
// note that the remove operation can be executed only once.
if (ret == 0) {
if (ref > 0) { if (ref > 0) {
assert(pNode->pTNodeHeader == NULL); assert(pNode->pTNodeHeader == NULL);
// todo trashcan lock __cache_wr_lock(pCacheObj);
taosAddToTrash(pCacheObj, pNode); taosAddToTrash(pCacheObj, pNode);
__cache_unlock(pCacheObj);
} else { // ref == 0 } else { // ref == 0
atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size); atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable),
pCacheObj->totalSize, pNode->size); pCacheObj->totalSize, pNode->size);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); if (pCacheObj->freeFp) {
pCacheObj->freeFp(pNode->data);
}
free(pNode); free(pNode);
} }
} }
@ -462,33 +487,40 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} else { } else {
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately. // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode); char* key = pNode->key;
char* p = pNode->data;
// todo so, invalid read here! int32_t ref = T_REF_DEC(pNode);
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data, uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, key, p, ref, inTrashCan);
ref, inTrashCan);
} }
} }
void taosCacheEmpty(SCacheObj *pCacheObj) { typedef struct SHashTravSupp {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); SCacheObj* pCacheObj;
int64_t time;
__cache_wr_lock(pCacheObj); __cache_free_fn_t fp;
while (taosHashIterNext(pIter)) { } SHashTravSupp;
if (pCacheObj->deleting == 1) {
break; static bool travHashTableEmptyFn(void* param, void* data) {
} SHashTravSupp* ps = (SHashTravSupp*) param;
SCacheObj* pCacheObj= ps->pCacheObj;
SCacheDataNode *pNode = *(SCacheDataNode **) taosHashIterGet(pIter);
if (T_REF_VAL_GET(pNode) == 0) { SCacheDataNode *pNode = *(SCacheDataNode **) data;
taosCacheReleaseNode(pCacheObj, pNode);
} else { if (T_REF_VAL_GET(pNode) == 0) {
taosCacheMoveToTrash(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
} } else { // do add to trashcan
taosAddToTrash(pCacheObj, pNode);
} }
__cache_unlock(pCacheObj);
// this node should be remove from hash table
taosHashDestroyIter(pIter); return false;
}
void taosCacheEmpty(SCacheObj *pCacheObj) {
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
taosTrashCanEmpty(pCacheObj, false); taosTrashCanEmpty(pCacheObj, false);
} }
@ -553,33 +585,6 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash); uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
} }
void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t)pElem->pData) {
uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
pCacheObj->numOfElemsInTrash--;
if (pElem->prev) {
pElem->prev->next = pElem->next;
} else { /* pnode is the header, update header */
pCacheObj->pTrash = pElem->next;
}
if (pElem->next) {
pElem->next->prev = pElem->prev;
}
pElem->pData->signature = 0;
if (pCacheObj->freeFp) {
pCacheObj->freeFp(pElem->pData->data);
}
free(pElem->pData);
free(pElem);
}
// TODO add another lock when scanning trashcan
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
@ -587,8 +592,8 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
if (pCacheObj->pTrash != NULL) { if (pCacheObj->pTrash != NULL) {
uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash); uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
} }
pCacheObj->pTrash = NULL;
pCacheObj->pTrash = NULL;
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
return; return;
} }
@ -604,10 +609,12 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data, uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data,
pCacheObj->numOfElemsInTrash - 1); pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
STrashElem *p = pElem;
pElem = pElem->next; pElem = pElem->next;
taosRemoveFromTrashCan(pCacheObj, p);
doRemoveElemInTrashcan(pCacheObj, p);
doDestroyTrashcanElem(pCacheObj, p);
} else { } else {
pElem = pElem->next; pElem = pElem->next;
} }
@ -617,26 +624,27 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
} }
void doCleanupDataCache(SCacheObj *pCacheObj) { void doCleanupDataCache(SCacheObj *pCacheObj) {
__cache_wr_lock(pCacheObj);
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); // SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
while (taosHashIterNext(pIter)) { // while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); // SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
//
// int32_t c = T_REF_VAL_GET(pNode);
// if (c <= 0) {
// taosCacheReleaseNode(pCacheObj, pNode);
// } else {
// uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
// pNode->data, T_REF_VAL_GET(pNode));
// }
// }
//
// taosHashDestroyIter(pIter);
int32_t c = T_REF_VAL_GET(pNode); SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
if (c <= 0) { taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
taosCacheReleaseNode(pCacheObj, pNode);
} else {
uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
pNode->data, T_REF_VAL_GET(pNode));
}
}
taosHashDestroyIter(pIter);
// todo memory leak if there are object with refcount greater than 0 in hash table? // todo memory leak if there are object with refcount greater than 0 in hash table?
taosHashCleanup(pCacheObj->pHashTable); taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj);
taosTrashCanEmpty(pCacheObj, true); taosTrashCanEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj); __cache_lock_destroy(pCacheObj);
@ -645,26 +653,31 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
free(pCacheObj); free(pCacheObj);
} }
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) { bool travHashTableFn(void* param, void* data) {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); SHashTravSupp* ps = (SHashTravSupp*) param;
SCacheObj* pCacheObj= ps->pCacheObj;
// __cache_wr_lock(pCacheObj); SCacheDataNode* pNode = *(SCacheDataNode **) data;
while (taosHashIterNext(pIter)) { if (pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); taosCacheReleaseNode(pCacheObj, pNode);
if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) { // this node should be remove from hash table
taosCacheReleaseNode(pCacheObj, pNode); return false;
continue;
}
if (fp) {
fp(pNode->data);
}
} }
// __cache_unlock(pCacheObj); if (ps->fp) {
(ps->fp)(pNode->data);
}
taosHashDestroyIter(pIter); // do not remove element in hash table
return true;
}
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
assert(pCacheObj != NULL);
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
} }
void* taosCacheTimedRefresh(void *handle) { void* taosCacheTimedRefresh(void *handle) {