diff --git a/include/libs/function/function.h b/include/libs/function/function.h index f563d7f5a8..924682f223 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -255,6 +255,7 @@ typedef struct SqlFunctionCtx { bool hasPrimaryKey; SFuncInputRowIter rowIter; bool bInputFinished; + bool hasWindowOrGroup; // denote that the function is used with time window or group } SqlFunctionCtx; typedef struct tExprNode { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 95414ff70e..9547fa12f3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -128,6 +128,7 @@ typedef struct SExprSupp { SqlFunctionCtx* pCtx; int32_t* rowEntryInfoOffset; // offset value for each row result cell info SFilterInfo* pFilterInfo; + bool hasWindowOrGroup; } SExprSupp; typedef enum { diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index a7b4532bd0..5ff0601b7a 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -77,6 +77,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN goto _error; } + pOperator->exprSupp.hasWindowOrGroup = false; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); @@ -519,6 +521,7 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo } for (int32_t i = 0; i < numOfCols; ++i) { + pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup; if (pState) { pSup->pCtx[i].saveHandle.pBuf = NULL; pSup->pCtx[i].saveHandle.pState = pState; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 47b24421c8..3a6c85532a 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -207,6 +207,8 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + int32_t code = TSDB_CODE_SUCCESS; SCountWinodwPhysiNode* pCountWindowNode = (SCountWinodwPhysiNode*)physiNode; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d73274f85e..b898ea576a 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -66,6 +66,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode; int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fca34e1e12..173ef0b829 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1716,6 +1716,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->param = pFunct->pParam; pCtx->saveHandle.currentPage = -1; pCtx->pStore = pStore; + pCtx->hasWindowOrGroup = false; } for (int32_t i = 1; i < numOfOutput; ++i) { diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 3de8468bb8..fad2b263b2 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -909,12 +909,21 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) { initResultRowInfo(&pInfo->resultRowInfo); } -static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; } for (int32_t i = 0; i < numOfOutput; ++i) { + if (pExpr != NULL) { + SExprInfo* pExprInfo = &pExpr[i]; + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { + if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) { + taosMemoryFree(pCtx[i].input.pData[j]); + taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]); + } + } + } for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { taosVariantDestroy(&pCtx[i].param[j].param); } @@ -947,7 +956,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, S } void cleanupExprSupp(SExprSupp* pSupp) { - destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs); + destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs); if (pSupp->pExprInfo != NULL) { destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); taosMemoryFreeClear(pSupp->pExprInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3a124197e0..f0fddc28fc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -502,6 +502,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 73134c55ef..6167497c21 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -99,6 +99,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys goto _error; } + pOperator->exprSupp.hasWindowOrGroup = false; pOperator->pTaskInfo = pTaskInfo; int32_t numOfCols = 0; @@ -409,6 +410,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pOperator->pTaskInfo = pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = false; SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 96c08f19f9..d68108b9a5 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1553,6 +1553,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; pOperator->pTaskInfo = pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; @@ -4216,6 +4217,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredDataSaved = false; SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = true; + initBasicInfo(&pInfo->binfo, pResBlock); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8b151edb27..bad15d1834 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1213,6 +1213,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = true; + pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -1461,6 +1463,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey); @@ -1558,6 +1561,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW goto _error; } + pOperator->exprSupp.hasWindowOrGroup = true; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -1845,6 +1850,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExprSupp* pSup = &pOperator->exprSupp; + pSup->hasWindowOrGroup = true; int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -2147,6 +2153,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder; SExprSupp* pExprSupp = &pOperator->exprSupp; + pExprSupp->hasWindowOrGroup = true; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 5351594b2f..34815a34ad 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -67,7 +67,7 @@ typedef struct tMemBucket { SHashObj *groupPagesMap; // disk page map for different groups; } tMemBucket; -tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval); +tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup); void tMemBucketDestroy(tMemBucket *pBucket); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3c7edad873..d5117e0eec 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1902,7 +1902,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { pResInfo->complete = true; return TSDB_CODE_SUCCESS; } else { - pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval); + pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup); } } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 776a7fb95a..a068186992 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -238,14 +238,20 @@ static void resetSlotInfo(tMemBucket *pBucket) { } } -tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval) { +tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup) { tMemBucket *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket)); if (pBucket == NULL) { return NULL; } - pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; - pBucket->bufPageSize = 16384 * 4; // 16k per page + if (hasWindowOrGroup) { + // With window or group by, we need to shrink page size and reduce page num to save memory. + pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket + pBucket->bufPageSize = 4096; // 4k per page + } else { + pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; + pBucket->bufPageSize = 16384 * 4; // 16k per page + } pBucket->type = dataType; pBucket->bytes = nElemSize;