Merge pull request #28150 from taosdata/fix/3.0/TD-32372
fix:[TD-32372] Clean up in a right way.
This commit is contained in:
commit
b2a2ff435d
|
@ -833,6 +833,9 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExpr
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
|
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
|
||||||
|
void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||||
const char* pkey, void* pState, SFunctionStateStore* pStore);
|
const char* pkey, void* pState, SFunctionStateStore* pStore);
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
|
|
|
@ -159,8 +159,8 @@ void destroyAggOperatorInfo(void* param) {
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
|
||||||
if (pInfo->pOperator) {
|
if (pInfo->pOperator) {
|
||||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
&pInfo->groupResInfo);
|
||||||
pInfo->pOperator = NULL;
|
pInfo->pOperator = NULL;
|
||||||
}
|
}
|
||||||
cleanupAggSup(&pInfo->aggSup);
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
|
@ -627,6 +627,42 @@ void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
|
SGroupResInfo* pGroupResInfo) {
|
||||||
|
int32_t numOfExprs = pSup->numOfExprs;
|
||||||
|
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||||
|
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||||
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
|
bool needCleanup = false;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
needCleanup |= pCtx[j].needCleanup;
|
||||||
|
}
|
||||||
|
if (!needCleanup) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
||||||
|
SResultRow* pRow = NULL;
|
||||||
|
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||||
|
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||||
|
if (page == NULL) {
|
||||||
|
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
||||||
|
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||||
|
if (pCtx[j].fpSet.cleanup) {
|
||||||
|
pCtx[j].fpSet.cleanup(&pCtx[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
|
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
|
||||||
int32_t numOfExprs = pSup->numOfExprs;
|
int32_t numOfExprs = pSup->numOfExprs;
|
||||||
|
|
|
@ -1229,10 +1229,9 @@ static void destroyStateWindowOperatorInfo(void* param) {
|
||||||
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
taosMemoryFreeClear(pInfo->stateKey.pData);
|
taosMemoryFreeClear(pInfo->stateKey.pData);
|
||||||
|
if (pInfo->pOperator) {
|
||||||
if (pInfo->pOperator != NULL) {
|
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
&pInfo->groupResInfo);
|
||||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
|
||||||
pInfo->pOperator = NULL;
|
pInfo->pOperator = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1257,10 +1256,9 @@ void destroyIntervalOperatorInfo(void* param) {
|
||||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
||||||
|
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
if (pInfo->pOperator) {
|
||||||
if (pInfo->pOperator != NULL) {
|
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
&pInfo->groupResInfo);
|
||||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
|
||||||
pInfo->pOperator = NULL;
|
pInfo->pOperator = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1764,10 +1762,9 @@ void destroySWindowOperatorInfo(void* param) {
|
||||||
|
|
||||||
cleanupBasicInfo(&pInfo->binfo);
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
if (pInfo->pOperator) {
|
||||||
if (pInfo->pOperator != NULL) {
|
cleanupResultInfoWithoutHash(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
||||||
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
|
&pInfo->groupResInfo);
|
||||||
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
|
|
||||||
pInfo->pOperator = NULL;
|
pInfo->pOperator = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue