From 71a61c3106471799c8966cd71080116392dbffb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 21 Dec 2022 19:19:20 +0800 Subject: [PATCH 1/5] fix(query): add lock for cache. --- source/dnode/vnode/src/meta/metaCache.c | 97 +++++++++++++++---------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 37dcec4f85..fb75b6963e 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -422,63 +422,77 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) { int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { - uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; - // generate the composed key for LRU cache SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; + uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; + SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry; + uint32_t times = 0; + + *acquireRes = 0; pBuf[0] = suid; memcpy(&pBuf[1], pKey, keyLen); + metaRLock(pMeta); + int32_t len = keyLen + sizeof(uint64_t); LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len); if (pHandle == NULL) { - *acquireRes = 0; + metaULock(pMeta); return TSDB_CODE_SUCCESS; - } else { // do some book mark work after acquiring the filter result from cache - STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); - ASSERT(pEntry != NULL); - *acquireRes = 1; + } - const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle); - int32_t size = *(int32_t*)p; - taosArrayAddBatch(pList1, p + sizeof(int32_t), size); + // do some book mark work after acquiring the filter result from cache + STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t)); + ASSERT(pEntry != NULL); + *acquireRes = 1; - (*pEntry)->qTimes += 1; - taosLRUCacheRelease(pCache, pHandle, false); + const char* p = taosLRUCacheValue(pCache, pHandle); + int32_t size = *(int32_t*)p; - // check if scanning all items are necessary or not - if ((*pEntry)->qTimes >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) { - SArray* pList = taosArrayInit(64, POINTER_BYTES); + // set the result into the buffer + taosArrayAddBatch(pList1, p + sizeof(int32_t), size); - SListIter iter = {0}; - tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); + times = atomic_add_fetch_32(&(*pEntry)->qTimes, 1); + taosLRUCacheRelease(pCache, pHandle, false); - SListNode* pNode = NULL; - while ((pNode = tdListNext(&iter)) != NULL) { - memcpy(&pBuf[1], pNode->data, keyLen); + // unlock meta + metaULock(pMeta); - // check whether it is existed in LRU cache, and remove it from linked list if not. - LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); - if (pRes == NULL) { // remove the item in the linked list - taosArrayPush(pList, &pNode); - } else { - taosLRUCacheRelease(pCache, pRes, false); - } + // check if scanning all items are necessary or not + if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) { + metaWLock(pMeta); + + SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES); + + SListIter iter = {0}; + tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL) { + memcpy(&pBuf[1], pNode->data, keyLen); + + // check whether it is existed in LRU cache, and remove it from linked list if not. + LRUHandle* pRes = taosLRUCacheLookup(pCache, pBuf, len); + if (pRes == NULL) { // remove the item in the linked list + taosArrayPush(pInvalidRes, &pNode); + } else { + taosLRUCacheRelease(pCache, pRes, false); } - - // remove the keys, of which query uid lists have been replaced already. - size_t s = taosArrayGetSize(pList); - for (int32_t i = 0; i < s; ++i) { - SListNode** p1 = taosArrayGet(pList, i); - tdListPopNode(&(*pEntry)->list, *p1); - taosMemoryFree(*p1); - } - - (*pEntry)->qTimes = 0; // reset the query times - - taosArrayDestroy(pList); } + + // remove the keys, of which query uid lists have been replaced already. + size_t s = taosArrayGetSize(pInvalidRes); + for (int32_t i = 0; i < s; ++i) { + SListNode** p1 = taosArrayGet(pInvalidRes, i); + tdListPopNode(&(*pEntry)->list, *p1); + taosMemoryFree(*p1); + } + + atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times + taosArrayDestroy(pInvalidRes); + + metaULock(pMeta); } return TSDB_CODE_SUCCESS; @@ -513,6 +527,8 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry; + metaWLock(pMeta); + STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); if (pEntry == NULL) { STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry)); @@ -533,6 +549,9 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int // add to cache. taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW); + + metaULock(pMeta); + metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid, (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); From d30c59b9e5f62ef9a46fc725f65c75d16a46de9c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Dec 2022 10:46:00 +0800 Subject: [PATCH 2/5] fix(query): comment some codes. --- source/util/src/tpagedbuf.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 37874ae250..e8092957e2 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -621,11 +621,13 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { const SDiskbasedBufStatis* ps = &pBuf->statis; +#if 0 printf( "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f " "Kb, %s\n", pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); +#endif if (ps->loadPages > 0) { printf( From 9872217462a140916f15305e076e286239d40986 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Dec 2022 10:52:51 +0800 Subject: [PATCH 3/5] fix(query): comment some codes. --- source/util/src/tpagedbuf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index e8092957e2..ced5b4f25e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -636,7 +636,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); } else { - printf("no page loaded\n"); + //printf("no page loaded\n"); } } From 55159a7ceb051113790399000755a1762a0063b8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Dec 2022 15:44:40 +0800 Subject: [PATCH 4/5] fix(query): update the mutex. --- source/dnode/vnode/src/meta/metaCache.c | 46 ++++++++++++++++--------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index fb75b6963e..513ee5a1c2 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -54,6 +54,7 @@ struct SMetaCache { // query cache struct STagFilterResCache { + TdThreadMutex lock; SHashObj* pTableEntry; SLRUCache* pUidResCache; uint64_t keyBuf[3]; @@ -140,6 +141,8 @@ int32_t metaCacheOpen(SMeta* pMeta) { } taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp); + taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL); + pMeta->pCache = pCache; return code; @@ -159,6 +162,8 @@ void metaCacheClose(SMeta* pMeta) { taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry); taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache); + taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock); + taosMemoryFree(pMeta->pCache); pMeta->pCache = NULL; } @@ -423,9 +428,10 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) { int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { // generate the composed key for LRU cache - SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; - uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; - SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry; + SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; + uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; + SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry; + TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock; uint32_t times = 0; @@ -433,12 +439,12 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK pBuf[0] = suid; memcpy(&pBuf[1], pKey, keyLen); - metaRLock(pMeta); + taosThreadMutexLock(pLock); int32_t len = keyLen + sizeof(uint64_t); LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len); if (pHandle == NULL) { - metaULock(pMeta); + taosThreadMutexUnlock(pLock); return TSDB_CODE_SUCCESS; } @@ -457,11 +463,11 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK taosLRUCacheRelease(pCache, pHandle, false); // unlock meta - metaULock(pMeta); + taosThreadMutexUnlock(pLock); // check if scanning all items are necessary or not if (times >= 5000 && TD_DLIST_NELES(&(*pEntry)->list) > 10) { - metaWLock(pMeta); + taosThreadMutexLock(pLock); SArray* pInvalidRes = taosArrayInit(64, POINTER_BYTES); @@ -492,7 +498,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK atomic_store_32(&(*pEntry)->qTimes, 0); // reset the query times taosArrayDestroy(pInvalidRes); - metaULock(pMeta); + taosThreadMutexUnlock(pLock); } return TSDB_CODE_SUCCESS; @@ -524,10 +530,11 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int return TSDB_CODE_SUCCESS; } - SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; - SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry; + SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache; + SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry; + TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock; - metaWLock(pMeta); + taosThreadMutexLock(pLock); STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); if (pEntry == NULL) { @@ -550,7 +557,7 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW); - metaULock(pMeta); + taosThreadMutexUnlock(pLock); metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid, (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); @@ -560,15 +567,19 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int // remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { - STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); - if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) { - return TSDB_CODE_SUCCESS; - } - int32_t keyLen = sizeof(uint64_t) * 3; uint64_t p[3] = {0}; p[0] = suid; + TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock; + + taosThreadMutexLock(pLock); + STagFilterResEntry** pEntry = taosHashGet(pMeta->pCache->sTagFilterResCache.pTableEntry, &suid, sizeof(uint64_t)); + if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) { + taosThreadMutexUnlock(pLock); + return TSDB_CODE_SUCCESS; + } + SListIter iter = {0}; tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD); @@ -581,5 +592,6 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { (*pEntry)->qTimes = 0; tdListEmpty(&(*pEntry)->list); + taosThreadMutexUnlock(pLock); return TSDB_CODE_SUCCESS; } From 8210c49a85a86a8eac73aeb896ed00c57c26bbec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Dec 2022 15:47:02 +0800 Subject: [PATCH 5/5] fix(query): fix memory leak. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4aca1ea228..6b71def573 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3522,6 +3522,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p } int32_t code = tRowMergerGetRow(&merge, pTSRow); + tRowMergerClear(&merge); + return code; }