Merge pull request #26489 from taosdata/fix/3.0/TD-30819
feat:[TD-30819] Reduce memory usage when use percentile with interval.
This commit is contained in:
commit
e4c624f286
|
@ -255,6 +255,7 @@ typedef struct SqlFunctionCtx {
|
||||||
bool hasPrimaryKey;
|
bool hasPrimaryKey;
|
||||||
SFuncInputRowIter rowIter;
|
SFuncInputRowIter rowIter;
|
||||||
bool bInputFinished;
|
bool bInputFinished;
|
||||||
|
bool hasWindowOrGroup; // denote that the function is used with time window or group
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
||||||
typedef struct tExprNode {
|
typedef struct tExprNode {
|
||||||
|
|
|
@ -128,6 +128,7 @@ typedef struct SExprSupp {
|
||||||
SqlFunctionCtx* pCtx;
|
SqlFunctionCtx* pCtx;
|
||||||
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
|
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
|
||||||
SFilterInfo* pFilterInfo;
|
SFilterInfo* pFilterInfo;
|
||||||
|
bool hasWindowOrGroup;
|
||||||
} SExprSupp;
|
} SExprSupp;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -77,6 +77,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = false;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
|
@ -519,6 +521,7 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
|
||||||
if (pState) {
|
if (pState) {
|
||||||
pSup->pCtx[i].saveHandle.pBuf = NULL;
|
pSup->pCtx[i].saveHandle.pBuf = NULL;
|
||||||
pSup->pCtx[i].saveHandle.pState = pState;
|
pSup->pCtx[i].saveHandle.pState = pState;
|
||||||
|
|
|
@ -207,6 +207,8 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = true;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SCountWinodwPhysiNode* pCountWindowNode = (SCountWinodwPhysiNode*)physiNode;
|
SCountWinodwPhysiNode* pCountWindowNode = (SCountWinodwPhysiNode*)physiNode;
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = true;
|
||||||
|
|
||||||
SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode;
|
SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode;
|
||||||
|
|
||||||
int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId;
|
int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId;
|
||||||
|
|
|
@ -1716,6 +1716,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
pCtx->param = pFunct->pParam;
|
pCtx->param = pFunct->pParam;
|
||||||
pCtx->saveHandle.currentPage = -1;
|
pCtx->saveHandle.currentPage = -1;
|
||||||
pCtx->pStore = pStore;
|
pCtx->pStore = pStore;
|
||||||
|
pCtx->hasWindowOrGroup = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 1; i < numOfOutput; ++i) {
|
for (int32_t i = 1; i < numOfOutput; ++i) {
|
||||||
|
|
|
@ -909,12 +909,21 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
initResultRowInfo(&pInfo->resultRowInfo);
|
initResultRowInfo(&pInfo->resultRowInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||||
if (pCtx == NULL) {
|
if (pCtx == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
if (pExpr != NULL) {
|
||||||
|
SExprInfo* pExprInfo = &pExpr[i];
|
||||||
|
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
|
||||||
|
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
taosMemoryFree(pCtx[i].input.pData[j]);
|
||||||
|
taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
|
for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
|
||||||
taosVariantDestroy(&pCtx[i].param[j].param);
|
taosVariantDestroy(&pCtx[i].param[j].param);
|
||||||
}
|
}
|
||||||
|
@ -947,7 +956,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, S
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupExprSupp(SExprSupp* pSupp) {
|
void cleanupExprSupp(SExprSupp* pSupp) {
|
||||||
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
|
destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
|
||||||
if (pSupp->pExprInfo != NULL) {
|
if (pSupp->pExprInfo != NULL) {
|
||||||
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
|
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
|
||||||
taosMemoryFreeClear(pSupp->pExprInfo);
|
taosMemoryFreeClear(pSupp->pExprInfo);
|
||||||
|
|
|
@ -502,6 +502,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = true;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
|
|
|
@ -99,6 +99,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = false;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
|
@ -409,6 +410,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
pSup->hasWindowOrGroup = false;
|
||||||
|
|
||||||
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||||
|
|
||||||
|
|
|
@ -1553,6 +1553,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = true;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
@ -4216,6 +4217,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->ignoreExpiredDataSaved = false;
|
pInfo->ignoreExpiredDataSaved = false;
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
pSup->hasWindowOrGroup = true;
|
||||||
|
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
|
|
|
@ -1213,6 +1213,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
pSup->hasWindowOrGroup = true;
|
||||||
|
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
|
pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
@ -1461,6 +1463,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = true;
|
||||||
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
||||||
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
|
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
|
||||||
|
|
||||||
|
@ -1558,6 +1561,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOperator->exprSupp.hasWindowOrGroup = true;
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
|
||||||
|
@ -1845,6 +1850,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
|
|
||||||
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
pSup->hasWindowOrGroup = true;
|
||||||
|
|
||||||
int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2147,6 +2153,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
||||||
pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
|
pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
|
||||||
|
|
||||||
SExprSupp* pExprSupp = &pOperator->exprSupp;
|
SExprSupp* pExprSupp = &pOperator->exprSupp;
|
||||||
|
pExprSupp->hasWindowOrGroup = true;
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
|
|
@ -67,7 +67,7 @@ typedef struct tMemBucket {
|
||||||
SHashObj *groupPagesMap; // disk page map for different groups;
|
SHashObj *groupPagesMap; // disk page map for different groups;
|
||||||
} tMemBucket;
|
} tMemBucket;
|
||||||
|
|
||||||
tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval);
|
tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup);
|
||||||
|
|
||||||
void tMemBucketDestroy(tMemBucket *pBucket);
|
void tMemBucketDestroy(tMemBucket *pBucket);
|
||||||
|
|
||||||
|
|
|
@ -1902,7 +1902,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
pResInfo->complete = true;
|
pResInfo->complete = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval);
|
pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -238,14 +238,20 @@ static void resetSlotInfo(tMemBucket *pBucket) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval) {
|
tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup) {
|
||||||
tMemBucket *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket));
|
tMemBucket *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket));
|
||||||
if (pBucket == NULL) {
|
if (pBucket == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT;
|
if (hasWindowOrGroup) {
|
||||||
pBucket->bufPageSize = 16384 * 4; // 16k per page
|
// With window or group by, we need to shrink page size and reduce page num to save memory.
|
||||||
|
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket
|
||||||
|
pBucket->bufPageSize = 4096; // 4k per page
|
||||||
|
} else {
|
||||||
|
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT;
|
||||||
|
pBucket->bufPageSize = 16384 * 4; // 16k per page
|
||||||
|
}
|
||||||
|
|
||||||
pBucket->type = dataType;
|
pBucket->type = dataType;
|
||||||
pBucket->bytes = nElemSize;
|
pBucket->bytes = nElemSize;
|
||||||
|
|
Loading…
Reference in New Issue