enh:[TD-32158] Free memory allocated in function setup.

This commit is contained in:
Jing Sima 2024-09-20 11:25:39 +08:00
parent a7a5c44666
commit 122b707ac8
15 changed files with 170 additions and 31 deletions

View File

@ -553,6 +553,7 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
struct SOperatorInfo* pOperator;
// for limit optimization
bool limited;
int64_t limit;
@ -621,6 +622,7 @@ typedef struct SStreamIntervalOperatorInfo {
int32_t midDelIndex;
SSHashObj* pDeletedMap;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
@ -676,6 +678,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool recvGetAll;
bool destHasPrimaryKey;
SSHashObj* pPkDeleted;
struct SOperatorInfo* pOperator;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
@ -703,6 +706,7 @@ typedef struct SStreamStateAggOperatorInfo {
bool recvGetAll;
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
} SStreamStateAggOperatorInfo;
typedef struct SStreamEventAggOperatorInfo {
@ -732,6 +736,7 @@ typedef struct SStreamEventAggOperatorInfo {
SFilterInfo* pEndCondInfo;
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
} SStreamEventAggOperatorInfo;
typedef struct SStreamCountAggOperatorInfo {
@ -756,6 +761,7 @@ typedef struct SStreamCountAggOperatorInfo {
SSDataBlock* pCheckpointRes;
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
} SStreamCountAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
@ -823,6 +829,10 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
void cleanupExprSupp(SExprSupp* pSup);
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState, SFunctionStateStore* pStore);
void cleanupAggSup(SAggSupporter* pAggSup);

View File

@ -48,6 +48,7 @@ typedef struct SAggOperatorInfo {
bool hasValidBlock;
SSDataBlock* pNewGroupBlock;
bool hasCountFunc;
SOperatorInfo* pOperator;
} SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param);
@ -119,6 +120,7 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
!pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -153,6 +155,9 @@ void destroyAggOperatorInfo(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarExprSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
@ -581,6 +586,66 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
return code;
}
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
SRowBuffPos* pPos = pWinInfo->pStatePos;
SResultRow* pRow = NULL;
code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
if (TSDB_CODE_SUCCESS != code) {
qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
continue;
}
for (int32_t j = 0; j < numOfExprs; ++j) {
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.cleanup) {
pCtx[j].fpSet.cleanup(&pCtx[j]);
}
}
}
}
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
// begin from last iter
void* pData = pGroupResInfo->dataPos;
int32_t iter = pGroupResInfo->iter;
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
SResultRowPosition* pos = pData;
SFilePage* page = getBufPage(pBuf, pos->pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
continue;
}
SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
for (int32_t j = 0; j < numOfExprs; ++j) {
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.cleanup) {
pCtx[j].fpSet.cleanup(&pCtx[j]);
}
}
releaseBufPage(pBuf, page);
}
}
void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf);
tSimpleHashCleanup(pAggSup->pResultRowHashTable);

View File

@ -37,6 +37,7 @@ typedef struct SEventWindowOperatorInfo {
bool inWindow;
SResultRow* pRow;
SSDataBlock* pPreDataBlock;
SOperatorInfo* pOperator;
} SEventWindowOperatorInfo;
static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes);
@ -128,6 +129,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
pInfo->tsSlotId = tsSlotId;
pInfo->pPreDataBlock = NULL;
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
@ -152,6 +154,19 @@ _error:
return code;
}
void cleanupResultInfoInEventWindow(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo) {
if (pInfo == NULL || pInfo->pRow == NULL) {
return;
}
SExprSupp* pSup = &pOperator->exprSupp;
for (int32_t j = 0; j < pSup->numOfExprs; ++j) {
pSup->pCtx[j].resultInfo = getResultEntryInfo(pInfo->pRow, j, pSup->rowEntryInfoOffset);
if (pSup->pCtx[j].fpSet.cleanup) {
pSup->pCtx[j].fpSet.cleanup(&pSup->pCtx[j]);
}
}
}
void destroyEWindowOperatorInfo(void* param) {
SEventWindowOperatorInfo* pInfo = (SEventWindowOperatorInfo*)param;
if (pInfo == NULL) {
@ -175,6 +190,8 @@ void destroyEWindowOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupResultInfoInEventWindow(pInfo->pOperator, pInfo);
pInfo->pOperator = NULL;
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);

View File

@ -1020,10 +1020,6 @@ static void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_
}
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pCtx[i].fpSet.cleanup != NULL) {
pCtx[i].fpSet.cleanup(&pCtx[i]);
}
if (pExpr != NULL) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {

View File

@ -39,6 +39,7 @@ typedef struct SGroupbyOperatorInfo {
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
SOperatorInfo *pOperator;
} SGroupbyOperatorInfo;
// The sort in partition may be needed later.
@ -85,9 +86,11 @@ static void destroyGroupOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
cleanupExprSupp(&pInfo->scalarSup);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
cleanupGroupResInfo(&pInfo->groupResInfo);
cleanupAggSup(&pInfo->aggSup);
pInfo->pOperator = NULL;
taosMemoryFreeClear(param);
}
@ -569,6 +572,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
pInfo->pOperator = pOperator;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregateNext, NULL, destroyGroupOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -667,13 +667,11 @@ void destroyOperator(SOperatorInfo* pOperator) {
pOperator->numOfDownstream = 0;
}
cleanupExprSupp(&pOperator->exprSupp);
// close operator after cleanup exprSupp, since we need to call cleanup of sqlFunctionCtx first to avoid mem leak.
if (pOperator->fpSet.closeFn != NULL && pOperator->info != NULL) {
pOperator->fpSet.closeFn(pOperator->info);
}
cleanupExprSupp(&pOperator->exprSupp);
taosMemoryFreeClear(pOperator);
}

View File

@ -48,6 +48,9 @@ typedef struct SBuffInfo {
void destroyStreamCountAggOperatorInfo(void* param) {
SStreamCountAggOperatorInfo* pInfo = (SStreamCountAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
@ -906,6 +909,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
taosMemoryFree(buff);
}
pInfo->pOperator = pOperator;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAggNext, NULL, destroyStreamCountAggOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);

View File

@ -48,6 +48,9 @@ void destroyStreamEventOperatorInfo(void* param) {
}
SStreamEventAggOperatorInfo* pInfo = (SStreamEventAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
@ -951,6 +954,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey;
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
// for stream

View File

@ -473,6 +473,9 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
}
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
cleanupAggSup(&pInfo->aggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
@ -2024,6 +2027,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
pInfo->pOperator = pOperator;
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -2088,6 +2092,9 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
}
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupExprSupp(&pInfo->scalarSupp);
clearGroupResInfo(&pInfo->groupResInfo);
@ -3855,6 +3862,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
pInfo->pOperator = pOperator;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
@ -4103,6 +4111,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pOperator->operatorType = pPhyNode->type;
pInfo->pOperator = pOperator;
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pOperator->fpSet =
@ -4173,6 +4182,9 @@ void destroyStreamStateOperatorInfo(void* param) {
}
SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
@ -5028,6 +5040,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
@ -5361,6 +5374,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, &pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =

View File

@ -37,6 +37,7 @@ typedef struct SSessionAggOperatorInfo {
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SOperatorInfo* pOperator;
} SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo {
@ -50,6 +51,7 @@ typedef struct SStateWindowOperatorInfo {
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
SOperatorInfo* pOperator;
} SStateWindowOperatorInfo;
typedef enum SResultTsInterpType {
@ -1224,6 +1226,9 @@ static void destroyStateWindowOperatorInfo(void* param) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
cleanupExprSupp(&pInfo->scalarSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
@ -1243,6 +1248,9 @@ void destroyIntervalOperatorInfo(void* param) {
}
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);
@ -1430,6 +1438,7 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
}
}
pInfo->pOperator = pOperator;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
@ -1706,7 +1715,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
QUERY_CHECK_CODE(code, lino, _error);
pInfo->tsSlotId = tsSlotId;
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
@ -1738,7 +1747,9 @@ void destroySWindowOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, pInfo->aggSup.pResultBuf,
&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable);
pInfo->pOperator = NULL;
cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSupp);
@ -1805,6 +1816,7 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
@ -2121,6 +2133,7 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);
iaInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
@ -2460,6 +2473,7 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
}
}
pIntervalInfo->pOperator = pOperator;
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);

View File

@ -122,6 +122,7 @@ bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t percentileFunction(SqlFunctionCtx* pCtx);
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
void percentileFunctionCleanupExt(SqlFunctionCtx* pCtx);
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);

View File

@ -69,7 +69,7 @@ typedef struct tMemBucket {
int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup,
tMemBucket **pBucket);
void tMemBucketDestroy(tMemBucket *pBucket);
void tMemBucketDestroy(tMemBucket **pBucket);
int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size);

View File

@ -3115,6 +3115,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = percentileFunction,
.sprocessFunc = percentileScalarFunction,
.finalizeFunc = percentileFinalize,
.cleanupFunc = percentileFunctionCleanupExt,
#ifdef BUILD_NO_CALL
.invertFunc = NULL,
#endif

View File

@ -2009,6 +2009,17 @@ int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResu
return TSDB_CODE_SUCCESS;
}
void percentileFunctionCleanupExt(SqlFunctionCtx* pCtx) {
if (pCtx == NULL || GET_RES_INFO(pCtx) == NULL || GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)) == NULL) {
return;
}
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pInfo->pMemBucket != NULL) {
tMemBucketDestroy(&(pInfo->pMemBucket));
pInfo->pMemBucket = NULL;
}
}
int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfElems = 0;
@ -2095,7 +2106,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
numOfElems += 1;
code = tMemBucketPut(pInfo->pMemBucket, data, 1);
if (code != TSDB_CODE_SUCCESS) {
tMemBucketDestroy(pInfo->pMemBucket);
tMemBucketDestroy(&(pInfo->pMemBucket));
return code;
}
}
@ -2113,8 +2124,8 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = 0;
double v = 0;
tMemBucket* pMemBucket = ppInfo->pMemBucket;
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
tMemBucket** pMemBucket = &ppInfo->pMemBucket;
if ((*pMemBucket) != NULL && (*pMemBucket)->total > 0) { // check for null
if (pCtx->numOfParams > 2) {
char buf[3200] = {0};
// max length of double num is 317, e.g. use %.6lf to print -1.0e+308, consider the comma and bracket, 3200 is enough.
@ -2126,7 +2137,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
code = getPercentile(pMemBucket, v, &ppInfo->result);
code = getPercentile((*pMemBucket), v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) {
goto _fin_error;
}
@ -2158,7 +2169,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
code = getPercentile(pMemBucket, v, &ppInfo->result);
code = getPercentile((*pMemBucket), v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) {
goto _fin_error;
}
@ -6067,7 +6078,6 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pInfo->pHash = NULL;
return terrno;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -291,12 +291,12 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou
(*pBucket)->maxCapacity = 200000;
(*pBucket)->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if ((*pBucket)->groupPagesMap == NULL) {
tMemBucketDestroy(*pBucket);
tMemBucketDestroy(pBucket);
return terrno;
}
if (setBoundingBox(&(*pBucket)->range, (*pBucket)->type, minval, maxval) != 0) {
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
tMemBucketDestroy(*pBucket);
tMemBucketDestroy(pBucket);
return TSDB_CODE_FUNC_INVALID_VALUE_RANGE;
}
@ -306,13 +306,13 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou
(*pBucket)->hashFunc = getHashFunc((*pBucket)->type);
if ((*pBucket)->hashFunc == NULL) {
// qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type);
tMemBucketDestroy(*pBucket);
tMemBucketDestroy(pBucket);
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}
(*pBucket)->pSlots = (tMemBucketSlot *)taosMemoryCalloc((*pBucket)->numOfSlots, sizeof(tMemBucketSlot));
if ((*pBucket)->pSlots == NULL) {
tMemBucketDestroy(*pBucket);
tMemBucketDestroy(pBucket);
return terrno;
}
@ -320,13 +320,13 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou
if (!osTempSpaceAvailable()) {
// qError("MemBucket create disk based Buf failed since %s", terrstr(terrno));
tMemBucketDestroy(*pBucket);
tMemBucketDestroy(pBucket);
return TSDB_CODE_NO_DISKSPACE;
}
int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * DEFAULT_NUM_OF_SLOT * 4, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(*pBucket);
tMemBucketDestroy(pBucket);
return ret;
}
@ -334,22 +334,22 @@ int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, dou
return TSDB_CODE_SUCCESS;
}
void tMemBucketDestroy(tMemBucket *pBucket) {
if (pBucket == NULL) {
void tMemBucketDestroy(tMemBucket **pBucket) {
if (*pBucket == NULL) {
return;
}
void *p = taosHashIterate(pBucket->groupPagesMap, NULL);
void *p = taosHashIterate((*pBucket)->groupPagesMap, NULL);
while (p) {
SArray **p1 = p;
p = taosHashIterate(pBucket->groupPagesMap, p);
p = taosHashIterate((*pBucket)->groupPagesMap, p);
taosArrayDestroy(*p1);
}
destroyDiskbasedBuf(pBucket->pBuffer);
taosMemoryFreeClear(pBucket->pSlots);
taosHashCleanup(pBucket->groupPagesMap);
taosMemoryFreeClear(pBucket);
destroyDiskbasedBuf((*pBucket)->pBuffer);
taosMemoryFreeClear((*pBucket)->pSlots);
taosHashCleanup((*pBucket)->groupPagesMap);
taosMemoryFreeClear(*pBucket);
}
int32_t tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataType) {