Merge pull request #18864 from taosdata/fix/metaCacheMemLeak

fix:fix mem leak
This commit is contained in:
Shengliang Guan 2022-12-10 11:28:29 +08:00 committed by GitHub
commit 585efa1ffd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 23 deletions

View File

@ -32,9 +32,9 @@ typedef struct SMetaStbStatsEntry {
} SMetaStbStatsEntry; } SMetaStbStatsEntry;
typedef struct STagFilterResEntry { typedef struct STagFilterResEntry {
uint64_t suid; // uid for super table uint64_t suid; // uid for super table
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
uint32_t qTimes;// queried times for current super table uint32_t qTimes; // queried times for current super table
} STagFilterResEntry; } STagFilterResEntry;
struct SMetaCache { struct SMetaCache {
@ -126,13 +126,14 @@ int32_t metaCacheOpen(SMeta* pMeta) {
goto _err2; goto _err2;
} }
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5*1024*1024, -1, 0.5); pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
if (pCache->sTagFilterResCache.pUidResCache == NULL) { if (pCache->sTagFilterResCache.pUidResCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2; goto _err2;
} }
pCache->sTagFilterResCache.pTableEntry = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK); pCache->sTagFilterResCache.pTableEntry =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pCache->sTagFilterResCache.pTableEntry == NULL) { if (pCache->sTagFilterResCache.pTableEntry == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err2; goto _err2;
@ -419,7 +420,8 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
return code; return code;
} }
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
// generate the composed key for LRU cache // generate the composed key for LRU cache
@ -428,8 +430,8 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
pBuf[0] = suid; pBuf[0] = suid;
memcpy(&pBuf[1], pKey, keyLen); memcpy(&pBuf[1], pKey, keyLen);
int32_t len = keyLen + sizeof(uint64_t); int32_t len = keyLen + sizeof(uint64_t);
LRUHandle *pHandle = taosLRUCacheLookup(pCache, pBuf, len); LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len);
if (pHandle == NULL) { if (pHandle == NULL) {
*acquireRes = 0; *acquireRes = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -439,7 +441,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
*acquireRes = 1; *acquireRes = 1;
const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle); const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle);
int32_t size = *(int32_t*) p; int32_t size = *(int32_t*)p;
taosArrayAddBatch(pList1, p + sizeof(int32_t), size); taosArrayAddBatch(pList1, p + sizeof(int32_t), size);
(*pEntry)->qTimes += 1; (*pEntry)->qTimes += 1;
@ -467,12 +469,15 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
// remove the keys, of which query uid lists have been replaced already. // remove the keys, of which query uid lists have been replaced already.
size_t s = taosArrayGetSize(pList); size_t s = taosArrayGetSize(pList);
for(int32_t i = 0; i < s; ++i) { for (int32_t i = 0; i < s; ++i) {
SListNode** p1 = taosArrayGet(pList, i); SListNode** p1 = taosArrayGet(pList, i);
tdListPopNode(&(*pEntry)->list, *p1); tdListPopNode(&(*pEntry)->list, *p1);
taosMemoryFree(*p1);
} }
(*pEntry)->qTimes = 0; // reset the query times (*pEntry)->qTimes = 0; // reset the query times
taosArrayDestroy(pList);
} }
} }
@ -487,7 +492,8 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
} }
// check both the payload size and selectivity ratio // check both the payload size and selectivity ratio
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) { int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen, double selectivityRatio) {
if (selectivityRatio > tsSelectivityRatio) { if (selectivityRatio > tsSelectivityRatio) {
metaDebug("vgId:%d, suid:%" PRIu64 metaDebug("vgId:%d, suid:%" PRIu64
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f", " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
@ -525,9 +531,10 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
ASSERT(sizeof(uint64_t) + keyLen == 24); ASSERT(sizeof(uint64_t) + keyLen == 24);
// add to cache. // add to cache.
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW); taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
metaDebug("vgId:%d, suid:%"PRIu64" list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), TAOS_LRU_PRIORITY_LOW);
suid, (int32_t) taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -539,7 +546,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t keyLen = sizeof(uint64_t) * 3; int32_t keyLen = sizeof(uint64_t) * 3;
uint64_t p[3] = {0}; uint64_t p[3] = {0};
p[0] = suid; p[0] = suid;

View File

@ -314,7 +314,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
int32_t rowIndex = j - num; int32_t rowIndex = j - num;
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
pOperator->exprSupp.numOfExprs);
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
@ -331,7 +332,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
int32_t rowIndex = pBlock->info.rows - num; int32_t rowIndex = pBlock->info.rows - num;
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
pOperator->exprSupp.numOfExprs);
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
} }
} }
@ -469,8 +471,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -776,6 +778,12 @@ static void destroyPartitionOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
taosMemoryFree(pInfo->keyBuf); taosMemoryFree(pInfo->keyBuf);
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
for (int32_t i = 0; i < size; i++) {
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
taosArrayDestroy(pGp->pPageList);
}
taosArrayDestroy(pInfo->sortedGroupArray); taosArrayDestroy(pInfo->sortedGroupArray);
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL); void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
@ -850,7 +858,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -1141,8 +1150,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL); destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);