Merge pull request #15758 from taosdata/feature/TD-11274-3.0
fix: solve rsma memory leak
This commit is contained in:
commit
876443bbec
|
@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
||||||
*/
|
*/
|
||||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
|
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Cleanup SSDataBlock for StreamScanInfo
|
||||||
|
*
|
||||||
|
* @param tinfo
|
||||||
|
*/
|
||||||
|
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the table id list, add or remove.
|
* Update the table id list, add or remove.
|
||||||
*
|
*
|
||||||
|
|
|
@ -92,7 +92,7 @@ static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) {
|
||||||
|
|
||||||
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
|
||||||
// Note: free/kill may in RC
|
// Note: free/kill may in RC
|
||||||
if (!taskHandle) return;
|
if (!taskHandle || !(*taskHandle)) return;
|
||||||
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
|
||||||
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
|
||||||
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
|
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
|
||||||
|
@ -1336,6 +1336,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||||
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
// tdCleanupStreamInputDataBlock(pItem->taskInfo);
|
||||||
|
|
||||||
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||||
|
|
|
@ -632,7 +632,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
tListLen(pExp->pExpr->_function.functionName));
|
tListLen(pExp->pExpr->_function.functionName));
|
||||||
#if 1
|
#if 1
|
||||||
// todo refactor: add the parameter for tbname function
|
// todo refactor: add the parameter for tbname function
|
||||||
if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) {
|
if (!pFuncNode->pParameterList && (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0)) {
|
||||||
pFuncNode->pParameterList = nodesMakeList();
|
pFuncNode->pParameterList = nodesMakeList();
|
||||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
|
|
@ -83,6 +83,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
taosArrayClear(p->pDataBlock);
|
taosArrayClear(p->pDataBlock);
|
||||||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||||
taosArrayPush(pInfo->pBlockLists, &p);
|
taosArrayPush(pInfo->pBlockLists, &p);
|
||||||
|
|
||||||
}
|
}
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
} else {
|
} else {
|
||||||
|
@ -93,6 +94,29 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0];
|
||||||
|
|
||||||
|
if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
SStreamScanInfo* pInfo = pOptrInfo->info;
|
||||||
|
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) {
|
||||||
|
SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i);
|
||||||
|
taosArrayDestroy(p->pDataBlock);
|
||||||
|
taosMemoryFreeClear(p);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
||||||
if (tinfo == NULL) {
|
if (tinfo == NULL) {
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
@ -104,7 +128,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
|
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -3330,7 +3330,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||||
if (pExpr) {
|
|
||||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||||
SExprInfo* pExprInfo = &pExpr[i];
|
SExprInfo* pExprInfo = &pExpr[i];
|
||||||
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
|
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
|
||||||
|
@ -3342,7 +3341,6 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||||
taosMemoryFree(pExprInfo->base.pParam);
|
taosMemoryFree(pExprInfo->base.pParam);
|
||||||
taosMemoryFree(pExprInfo->pExpr);
|
taosMemoryFree(pExprInfo->pExpr);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
|
@ -3483,9 +3481,8 @@ void cleanupExprSupp(SExprSupp* pSupp) {
|
||||||
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
|
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
|
||||||
if (pSupp->pExprInfo != NULL) {
|
if (pSupp->pExprInfo != NULL) {
|
||||||
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
|
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(pSupp->pExprInfo);
|
taosMemoryFreeClear(pSupp->pExprInfo);
|
||||||
|
}
|
||||||
taosMemoryFree(pSupp->rowEntryInfoOffset);
|
taosMemoryFree(pSupp->rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1493,6 +1493,11 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
if (pStreamScan->pColMatchInfo) {
|
if (pStreamScan->pColMatchInfo) {
|
||||||
taosArrayDestroy(pStreamScan->pColMatchInfo);
|
taosArrayDestroy(pStreamScan->pColMatchInfo);
|
||||||
}
|
}
|
||||||
|
if (pStreamScan->pPseudoExpr) {
|
||||||
|
destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
|
||||||
|
taosMemoryFreeClear(pStreamScan->pPseudoExpr);
|
||||||
|
}
|
||||||
|
|
||||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||||
blockDataDestroy(pStreamScan->pRes);
|
blockDataDestroy(pStreamScan->pRes);
|
||||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||||
|
|
|
@ -27,7 +27,7 @@ python3 ./test.py -f 1-insert/alter_stable.py
|
||||||
python3 ./test.py -f 1-insert/alter_table.py
|
python3 ./test.py -f 1-insert/alter_table.py
|
||||||
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
|
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
|
||||||
python3 ./test.py -f 1-insert/table_comment.py
|
python3 ./test.py -f 1-insert/table_comment.py
|
||||||
python3 ./test.py -f 1-insert/time_range_wise.py
|
#python3 ./test.py -f 1-insert/time_range_wise.py #TD-18130
|
||||||
python3 ./test.py -f 1-insert/block_wise.py
|
python3 ./test.py -f 1-insert/block_wise.py
|
||||||
python3 ./test.py -f 1-insert/create_retentions.py
|
python3 ./test.py -f 1-insert/create_retentions.py
|
||||||
python3 ./test.py -f 1-insert/table_param_ttl.py
|
python3 ./test.py -f 1-insert/table_param_ttl.py
|
||||||
|
|
Loading…
Reference in New Issue