fix:[TD-32506] fix mem leak in percentile function.

This commit is contained in:
Jing Sima 2024-10-31 17:11:34 +08:00
parent 9622197d57
commit 355aa885de
5 changed files with 42 additions and 17 deletions

View File

@ -553,6 +553,7 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool cleanGroupResInfo;
struct SOperatorInfo* pOperator;
// for limit optimization
bool limited;
@ -831,8 +832,10 @@ void cleanupExprSupp(SExprSupp* pSup);
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
SAggSupporter *pAggSup, bool cleanHashmap);
void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo);

View File

@ -49,6 +49,7 @@ typedef struct SAggOperatorInfo {
SSDataBlock* pNewGroupBlock;
bool hasCountFunc;
SOperatorInfo* pOperator;
bool cleanGroupResInfo;
} SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param);
@ -121,6 +122,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
!pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -159,8 +161,8 @@ void destroyAggOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);
@ -191,6 +193,7 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
int32_t order = pAggInfo->binfo.inputTsOrder;
SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
pAggInfo->cleanGroupResInfo = false;
if (pBlock) {
pAggInfo->pNewGroupBlock = NULL;
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
@ -263,6 +266,7 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
QUERY_CHECK_CODE(code, lino, _end);
pAggInfo->cleanGroupResInfo = true;
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -627,7 +631,7 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp
}
}
void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo) {
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
@ -663,7 +667,7 @@ void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDi
}
}
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
@ -701,6 +705,14 @@ void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf*
}
}
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
SAggSupporter *pAggSup, bool cleanGroupResInfo) {
if (cleanGroupResInfo) {
cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
} else {
cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
}
}
void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf);
tSimpleHashCleanup(pAggSup->pResultRowHashTable);

View File

@ -88,8 +88,8 @@ static void destroyGroupOperatorInfo(void* param) {
cleanupExprSupp(&pInfo->scalarSup);
if (pInfo->pOperator != NULL) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
false);
pInfo->pOperator = NULL;
}

View File

@ -474,8 +474,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
false);
pInfo->pOperator = NULL;
}
cleanupAggSup(&pInfo->aggSup);

View File

@ -38,6 +38,7 @@ typedef struct SSessionAggOperatorInfo {
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SOperatorInfo* pOperator;
bool cleanGroupResInfo;
} SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo {
@ -52,6 +53,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
SOperatorInfo* pOperator;
bool cleanGroupResInfo;
} SStateWindowOperatorInfo;
typedef enum SResultTsInterpType {
@ -943,6 +945,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
int32_t scanFlag = MAIN_SCAN;
int64_t st = taosGetTimestampUs();
pInfo->cleanGroupResInfo = false;
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
@ -965,6 +968,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->cleanGroupResInfo = true;
OPTR_SET_OPENED(pOperator);
@ -1092,6 +1096,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
pInfo->cleanGroupResInfo = false;
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
@ -1120,7 +1125,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->cleanGroupResInfo = true;
pOperator->status = OP_RES_TO_RETURN;
_end:
@ -1230,8 +1235,8 @@ static void destroyStateWindowOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
if (pInfo->pOperator) {
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
@ -1257,8 +1262,8 @@ void destroyIntervalOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
@ -1452,6 +1457,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
}
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
@ -1573,6 +1579,7 @@ static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
pInfo->cleanGroupResInfo = false;
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
@ -1628,6 +1635,7 @@ static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->cleanGroupResInfo = true;
code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
@ -1731,6 +1739,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
pInfo->tsSlotId = tsSlotId;
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
@ -1763,8 +1772,8 @@ void destroySWindowOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
if (pInfo->pOperator) {
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
pInfo->cleanGroupResInfo);
pInfo->pOperator = NULL;
}
@ -1835,6 +1844,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,