From 71a61c3106471799c8966cd71080116392dbffb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Dec 2022 19:19:20 +0800 Subject: [PATCH] fix(query): add lock for cache. --- source/dnode/vnode/src/meta/metaCache.c | 97 +++++++++++++++---------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 37dcec4f85..fb75b6963e 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -422,63 +422,77 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) { int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { - uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; - // generate the composed key for LRU cache SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; + uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; + SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry; + uint32_t times = 0; + + *acquireRes = 0; pBuf[0] = suid; memcpy(&pBuf[1], pKey, keyLen); + metaRLock(pMeta); + int32_t len = keyLen + sizeof(uint64_t); LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len); if (pHandle == NULL) { - *acquireRes = 0; + metaULock(pMeta); return TSDB_CODE_SUCCESS; - } else { // do some book mark work after acquiring the filter result from cache - STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); - ASSERT(pEntry != NULL); - *acquireRes = 1; + } - const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle); - int32_t size = *(int32_t*)p; - taosArrayAddBatch(pList1, p + sizeof(int32_t), size); + // do some book mark work after acquiring the filter result from cache + STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t)); + ASSERT(pEntry != NULL); + *acquireRes = 1; - (*pEntry)->qTimes += 1; - taosLRUCacheRelease(pCache, pHandle, false); + const char* p = taosLRUCacheValue(pCache, pHandle); + int32_t size = *(int32_t*)p; - // check if scanning all items are necessary or not - if ((*pEntry)->qTimes >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) { - SArray* pList = taosArrayInit(64, POINTER_BYTES); + // set the result into the buffer + taosArrayAddBatch(pList1, p + sizeof(int32_t), size); - SListIter iter = {0}; - tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); + times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1); + taosLRUCacheRelease(pCache, pHandle, false); - SListNode* pNode = NULL; - while ((pNode = tdListNext(&iter)) != NULL) { - memcpy(&pBuf[1], pNode->data, keyLen); + // unlock meta + metaULock(pMeta); - // check whether it is existed in LRU cache, and remove it from linked list if not. - LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); - if (pRes == NULL) { // remove the item in the linked list - taosArrayPush(pList, &pNode); - } else { - taosLRUCacheRelease(pCache, pRes, false); - } + // check if scanning all items are necessary or not + if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) { + metaWLock(pMeta); + + SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES); + + SListIter iter = {0}; + tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL) { + memcpy(&pBuf[1], pNode->data, keyLen); + + // check whether it is existed in LRU cache, and remove it from linked list if not. + LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); + if (pRes == NULL) { // remove the item in the linked list + taosArrayPush(pInvalidRes, &pNode); + } else { + taosLRUCacheRelease(pCache, pRes, false); } - - // remove the keys, of which query uid lists have been replaced already. - size_t s = taosArrayGetSize(pList); - for (int32_t i = 0; i < s; ++i) { - SListNode** p1 = taosArrayGet(pList, i); - tdListPopNode(&(*pEntry)->list, *p1); - taosMemoryFree(*p1); - } - - (*pEntry)->qTimes = 0; // reset the query times - - taosArrayDestroy(pList); } + + // remove the keys, of which query uid lists have been replaced already. + size_t s = taosArrayGetSize(pInvalidRes); + for (int32_t i = 0; i < s; ++i) { + SListNode** p1 = taosArrayGet(pInvalidRes, i); + tdListPopNode(&(*pEntry)->list, *p1); + taosMemoryFree(*p1); + } + + atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times + taosArrayDestroy(pInvalidRes); + + metaULock(pMeta); } return TSDB_CODE_SUCCESS; @@ -513,6 +527,8 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry; + metaWLock(pMeta); + STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); if (pEntry == NULL) { STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry)); @@ -533,6 +549,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int // add to cache. taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW); + + metaULock(pMeta); + metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid, (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));