From 154ef739d9b8e900c3459da354da2880ba1b1bd3 Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 3 Jul 2024 15:38:12 +0800 Subject: [PATCH 1/3] fix:[TD-30819] fix memory leak in SqlFunctionCtx. --- source/libs/executor/src/executorInt.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 3de8468bb8..fad2b263b2 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -909,12 +909,21 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) { initResultRowInfo(&pInfo->resultRowInfo); } -static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; } for (int32_t i = 0; i < numOfOutput; ++i) { + if (pExpr != NULL) { + SExprInfo* pExprInfo = &pExpr[i]; + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { + if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) { + taosMemoryFree(pCtx[i].input.pData[j]); + taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]); + } + } + } for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { taosVariantDestroy(&pCtx[i].param[j].param); } @@ -947,7 +956,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, S } void cleanupExprSupp(SExprSupp* pSupp) { - destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs); + destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs); if (pSupp->pExprInfo != NULL) { destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); taosMemoryFreeClear(pSupp->pExprInfo); From f17c535ca0ca0449ac99293e9324a367fef247ec Mon Sep 17 00:00:00 2001 From: sima Date: Mon, 8 Jul 2024 16:02:43 +0800 Subject: [PATCH 2/3] feat:[TD-30819] Reduce memory usage when using percentile with interval. --- include/libs/function/function.h | 1 + source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/aggregateoperator.c | 3 +++ source/libs/executor/src/countwindowoperator.c | 2 ++ source/libs/executor/src/eventwindowoperator.c | 2 ++ source/libs/executor/src/executil.c | 1 + source/libs/executor/src/groupoperator.c | 2 ++ source/libs/executor/src/projectoperator.c | 2 ++ source/libs/executor/src/streamtimewindowoperator.c | 3 +++ source/libs/executor/src/timewindowoperator.c | 7 +++++++ source/libs/function/inc/tpercentile.h | 2 +- source/libs/function/src/builtinsimpl.c | 2 +- source/libs/function/src/tpercentile.c | 12 +++++++++--- 13 files changed, 35 insertions(+), 5 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 87bbe21133..be3a4255b8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -253,6 +253,7 @@ typedef struct SqlFunctionCtx { bool hasPrimaryKey; SFuncInputRowIter rowIter; bool bInputFinished; + bool hasWindowOrGroup; // denote that the function is used with time window or group } SqlFunctionCtx; typedef struct tExprNode { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 95414ff70e..9547fa12f3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -128,6 +128,7 @@ typedef struct SExprSupp { SqlFunctionCtx* pCtx; int32_t* rowEntryInfoOffset; // offset value for each row result cell info SFilterInfo* pFilterInfo; + bool hasWindowOrGroup; } SExprSupp; typedef enum { diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index a7b4532bd0..5ff0601b7a 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -77,6 +77,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN goto _error; } + pOperator->exprSupp.hasWindowOrGroup = false; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -519,6 +521,7 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo } for (int32_t i = 0; i < numOfCols; ++i) { + pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup; if (pState) { pSup->pCtx[i].saveHandle.pBuf = NULL; pSup->pCtx[i].saveHandle.pState = pState; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 47b24421c8..3a6c85532a 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -207,6 +207,8 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + int32_t code = TSDB_CODE_SUCCESS; SCountWinodwPhysiNode* pCountWindowNode = (SCountWinodwPhysiNode*)physiNode; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d73274f85e..b898ea576a 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -66,6 +66,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode; int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index cb2a75407a..7b3c7c38f4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1716,6 +1716,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->param = pFunct->pParam; pCtx->saveHandle.currentPage = -1; pCtx->pStore = pStore; + pCtx->hasWindowOrGroup = false; } for (int32_t i = 1; i < numOfOutput; ++i) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3a124197e0..f0fddc28fc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -502,6 +502,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 19828d5146..7353787f69 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -99,6 +99,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys goto _error; } + pOperator->exprSupp.hasWindowOrGroup = false; pOperator->pTaskInfo = pTaskInfo; int32_t numOfCols = 0; @@ -409,6 +410,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pOperator->pTaskInfo = pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = false; SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5b9c018bba..9bfd6eaa1e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1546,6 +1546,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; pOperator->pTaskInfo = pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -4209,6 +4210,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredDataSaved = false; SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = true; + initBasicInfo(&pInfo->binfo, pResBlock); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8b151edb27..9d1fb9e518 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1213,6 +1213,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = true; + pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -1461,6 +1463,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey); @@ -1558,6 +1561,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW goto _error; } + pOperator->exprSupp.hasWindowOrGroup = false; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -1845,6 +1850,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = true; int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -2147,6 +2153,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder; SExprSupp* pExprSupp = &pOperator->exprSupp; + pExprSupp->hasWindowOrGroup = true; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 5351594b2f..34815a34ad 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -67,7 +67,7 @@ typedef struct tMemBucket { SHashObj *groupPagesMap; // disk page map for different groups; } tMemBucket; -tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval); +tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup); void tMemBucketDestroy(tMemBucket *pBucket); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1aa92479b9..5e2c7da8bc 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1897,7 +1897,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { pResInfo->complete = true; return TSDB_CODE_SUCCESS; } else { - pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval); + pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup); } } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 776a7fb95a..a068186992 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -238,14 +238,20 @@ static void resetSlotInfo(tMemBucket *pBucket) { } } -tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval) { +tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup) { tMemBucket *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket)); if (pBucket == NULL) { return NULL; } - pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; - pBucket->bufPageSize = 16384 * 4; // 16k per page + if (hasWindowOrGroup) { + // With window or group by, we need to shrink page size and reduce page num to save memory. + pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket + pBucket->bufPageSize = 4096; // 4k per page + } else { + pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; + pBucket->bufPageSize = 16384 * 4; // 16k per page + } pBucket->type = dataType; pBucket->bytes = nElemSize; From 3e3e7e38080110b302eb48b682060cee4b61650b Mon Sep 17 00:00:00 2001 From: dapan1121 <72057773+dapan1121@users.noreply.github.com> Date: Thu, 11 Jul 2024 13:35:25 +0800 Subject: [PATCH 3/3] Update timewindowoperator.c --- source/libs/executor/src/timewindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9d1fb9e518..bad15d1834 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1561,7 +1561,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW goto _error; } - pOperator->exprSupp.hasWindowOrGroup = false; + pOperator->exprSupp.hasWindowOrGroup = true; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096);