fix(query): add lock for cache.
This commit is contained in:
parent
d5a770c98a
commit
71a61c3106
|
@ -422,34 +422,48 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
|
|
||||||
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
||||||
bool* acquireRes) {
|
bool* acquireRes) {
|
||||||
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
|
||||||
|
|
||||||
// generate the composed key for LRU cache
|
// generate the composed key for LRU cache
|
||||||
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
||||||
|
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
||||||
|
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||||
|
|
||||||
|
uint32_t times = 0;
|
||||||
|
|
||||||
|
*acquireRes = 0;
|
||||||
pBuf[0] = suid;
|
pBuf[0] = suid;
|
||||||
memcpy(&pBuf[1], pKey, keyLen);
|
memcpy(&pBuf[1], pKey, keyLen);
|
||||||
|
|
||||||
|
metaRLock(pMeta);
|
||||||
|
|
||||||
int32_t len = keyLen + sizeof(uint64_t);
|
int32_t len = keyLen + sizeof(uint64_t);
|
||||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
|
LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
*acquireRes = 0;
|
metaULock(pMeta);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else { // do some book mark work after acquiring the filter result from cache
|
}
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t));
|
|
||||||
|
// do some book mark work after acquiring the filter result from cache
|
||||||
|
STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
*acquireRes = 1;
|
*acquireRes = 1;
|
||||||
|
|
||||||
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
|
const char* p = taosLRUCacheValue(pCache, pHandle);
|
||||||
int32_t size = *(int32_t*)p;
|
int32_t size = *(int32_t*)p;
|
||||||
|
|
||||||
|
// set the result into the buffer
|
||||||
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
|
taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
|
||||||
|
|
||||||
(*pEntry)->qTimes += 1;
|
times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1);
|
||||||
taosLRUCacheRelease(pCache, pHandle, false);
|
taosLRUCacheRelease(pCache, pHandle, false);
|
||||||
|
|
||||||
|
// unlock meta
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
// check if scanning all items are necessary or not
|
// check if scanning all items are necessary or not
|
||||||
if ((*pEntry)->qTimes >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
|
if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) {
|
||||||
SArray* pList = taosArrayInit(64, POINTER_BYTES);
|
metaWLock(pMeta);
|
||||||
|
|
||||||
|
SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES);
|
||||||
|
|
||||||
SListIter iter = {0};
|
SListIter iter = {0};
|
||||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
||||||
|
@ -461,24 +475,24 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
||||||
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
|
LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len);
|
||||||
if (pRes == NULL) { // remove the item in the linked list
|
if (pRes == NULL) { // remove the item in the linked list
|
||||||
taosArrayPush(pList, &pNode);
|
taosArrayPush(pInvalidRes, &pNode);
|
||||||
} else {
|
} else {
|
||||||
taosLRUCacheRelease(pCache, pRes, false);
|
taosLRUCacheRelease(pCache, pRes, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the keys, of which query uid lists have been replaced already.
|
// remove the keys, of which query uid lists have been replaced already.
|
||||||
size_t s = taosArrayGetSize(pList);
|
size_t s = taosArrayGetSize(pInvalidRes);
|
||||||
for (int32_t i = 0; i < s; ++i) {
|
for (int32_t i = 0; i < s; ++i) {
|
||||||
SListNode** p1 = taosArrayGet(pList, i);
|
SListNode** p1 = taosArrayGet(pInvalidRes, i);
|
||||||
tdListPopNode(&(*pEntry)->list, *p1);
|
tdListPopNode(&(*pEntry)->list, *p1);
|
||||||
taosMemoryFree(*p1);
|
taosMemoryFree(*p1);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->qTimes = 0; // reset the query times
|
atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times
|
||||||
|
taosArrayDestroy(pInvalidRes);
|
||||||
|
|
||||||
taosArrayDestroy(pList);
|
metaULock(pMeta);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -513,6 +527,8 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
|
||||||
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
|
||||||
|
|
||||||
|
metaWLock(pMeta);
|
||||||
|
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
||||||
if (pEntry == NULL) {
|
if (pEntry == NULL) {
|
||||||
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
|
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
|
||||||
|
@ -533,6 +549,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
// add to cache.
|
// add to cache.
|
||||||
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||||
TAOS_LRU_PRIORITY_LOW);
|
TAOS_LRU_PRIORITY_LOW);
|
||||||
|
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
||||||
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue