fix(query): add the check of item when putting in cache.

This commit is contained in:
Haojun Liao 2023-01-12 19:24:23 +08:00
parent effcfc057d
commit 96feaaadbe
1 changed files with 36 additions and 2 deletions

View File

@ -431,7 +431,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint64_t buf[3] = {0};
uint64_t buf[3];
uint32_t times = 0;
*acquireRes = 0;
@ -465,7 +465,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
taosThreadMutexUnlock(pLock);
// check if scanning all items are necessary or not
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 100) {
taosThreadMutexLock(pLock);
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
@ -549,6 +549,8 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
tdListAppend(&p->list, pKey);
} else {
// check if it exists or not
int32_t times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1);
size_t size = listNEles(&(*pEntry)->list);
if (size == 0) {
tdListAppend(&(*pEntry)->list, pKey);
@ -562,6 +564,17 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
tdListAppend(&(*pEntry)->list, pKey);
}
} else { // more than one element
bool checkCacheEntry = false;
SArray* pInvalidRes = NULL;
uint64_t keyBuf[3];
if (size >= 100 || times > 5000) {
// if the threshold value is reached, need to check the value.
checkCacheEntry = true;
keyBuf[0] = suid;
pInvalidRes = taosArrayInit(64, POINTER_BYTES);
}
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
@ -574,6 +587,27 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
// check whether it is existed in LRU cache, and remove it from linked list if not.
if (checkCacheEntry) {
keyBuf[1] = p[1];
keyBuf[2] = p[2];
LRUHandle* pRes = taosLRUCacheLookup(pCache, keyBuf, 24);
if (pRes == NULL) { // remove the item in the linked list
taosArrayPush(pInvalidRes, &pNode);
} else {
taosLRUCacheRelease(pCache, pRes, false);
}
}
}
// do remove invalid entry in hash
size_t s = taosArrayGetSize(pInvalidRes);
for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pInvalidRes, i);
tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
}
tdListAppend(&(*pEntry)->list, pKey);