From eac86f72b3538322569bb5337f6e95db02a93e44 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 22 Dec 2023 18:30:22 +0800 Subject: [PATCH] fix:partical func parameters for middle interval[checkStreamSTable1.sim] --- include/libs/function/functionMgt.h | 2 +- source/libs/function/inc/builtins.h | 1 + source/libs/function/src/builtins.c | 28 ++++++++++++ source/libs/function/src/functionMgt.c | 35 ++++++++++++++- source/libs/planner/src/planSpliter.c | 60 +++++++++++--------------- 5 files changed, 89 insertions(+), 37 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 865f1b2295..878990425b 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -243,7 +243,7 @@ bool fmIsSkipScanCheckFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType); SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList); -int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); +int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc); typedef enum EFuncDataRequired { FUNC_DATA_REQUIRED_DATA_LOAD = 1, diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index e7fcc38818..b13d481254 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -43,6 +43,7 @@ typedef struct SBuiltinFuncDefinition { FExecProcess invertFunc; FExecCombine combineFunc; const char* pPartialFunc; + const char* pMiddleFunc; const char* pMergeFunc; FCreateMergeFuncParameters createMergeParaFuc; FEstimateReturnRows estimateReturnRowsFunc; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 98fda024fa..9546733c7e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -422,6 +422,20 @@ static int32_t translateAvgPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t return TSDB_CODE_SUCCESS; } +static int32_t translateAvgMiddle(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getAvgInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -2485,6 +2499,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = avgInvertFunction, .combineFunc = avgCombine, .pPartialFunc = "_avg_partial", + .pMiddleFunc = "_avg_middle", .pMergeFunc = "_avg_merge" }, { @@ -2500,6 +2515,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = avgInvertFunction, .combineFunc = avgCombine, }, + { + .name = "_avg_middle", + .type = FUNCTION_TYPE_AVG_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateAvgMiddle, + .dataRequiredFunc = statisDataRequired, + .getEnvFunc = getAvgFuncEnv, + .initFunc = avgFunctionSetup, + .processFunc = avgFunctionMerge, + .finalizeFunc = avgPartialFinalize, + .invertFunc = avgInvertFunction, + .combineFunc = avgCombine, + }, { .name = "_avg_merge", .type = FUNCTION_TYPE_AVG_MERGE, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 036e4238d4..1f3fb2e943 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -422,6 +422,35 @@ static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctio } } +static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, + SFunctionNode** pMidFunc) { + SNodeList* pParameterList = NULL; + SFunctionNode* pFunc = NULL; + + int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList); + if (TSDB_CODE_SUCCESS == code) { + if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){ + pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList); + }else{ + pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); + } + if (NULL == pFunc) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + strcpy(pFunc->node.aliasName, pPartialFunc->node.aliasName); + } + + if (TSDB_CODE_SUCCESS == code) { + *pMidFunc = pFunc; + } else { + nodesDestroyList(pParameterList); + } + + return code; +} + static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, SFunctionNode** pMergeFunc) { SNodeList* pParameterList = NULL; @@ -451,18 +480,22 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio return code; } -int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) { +int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc) { if (!fmIsDistExecFunc(pFunc->funcId)) { return TSDB_CODE_FAILED; } int32_t code = createPartialFunction(pFunc, pPartialFunc); + if (TSDB_CODE_SUCCESS == code) { + code = createMidFunction(pFunc, *pPartialFunc, pMidFunc); + } if (TSDB_CODE_SUCCESS == code) { code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc); } if (TSDB_CODE_SUCCESS != code) { nodesDestroyNode((SNode*)*pPartialFunc); + nodesDestroyNode((SNode*)*pMidFunc); nodesDestroyNode((SNode*)*pMergeFunc); } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d1f9901b30..987a1dc051 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -344,11 +344,12 @@ static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SL return false; } -static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) { +static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) { SNode* pNode = NULL; FOREACH(pNode, pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pNode; SFunctionNode* pPartFunc = NULL; + SFunctionNode* pMidFunc = NULL; SFunctionNode* pMergeFunc = NULL; int32_t code = TSDB_CODE_SUCCESS; if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) { @@ -359,18 +360,33 @@ static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFu nodesDestroyNode((SNode*)pMergeFunc); code = TSDB_CODE_OUT_OF_MEMORY; } + if(pMidFuncs != NULL){ + pMidFunc = (SFunctionNode*)nodesCloneNode(pNode); + if (NULL == pMidFunc) { + nodesDestroyNode((SNode*)pMidFunc); + code = TSDB_CODE_OUT_OF_MEMORY; + } + } } else { - code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc); + code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc); } + if (TSDB_CODE_SUCCESS == code) { + if(pMidFuncs != NULL){ + code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc); + }else{ + nodesDestroyNode((SNode*)pMidFunc); + } + } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc); } if (TSDB_CODE_SUCCESS != code) { - nodesDestroyList(*pPartialFuncs); - nodesDestroyList(*pMergeFuncs); + nodesDestroyNode((SNode*)pPartFunc); + nodesDestroyNode((SNode*)pMidFunc); + nodesDestroyNode((SNode*)pMergeFunc); return code; } } @@ -463,7 +479,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic splSetParent((SLogicNode*)pPartWin); int32_t index = 0; - int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs); + int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision); } @@ -488,16 +504,6 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static SNode* createColumnByFunc(const SFunctionNode* pFunc) { - SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); - if (NULL == pCol) { - return NULL; - } - strcpy(pCol->colName, pFunc->node.aliasName); - pCol->node.resType = pFunc->node.resType; - return (SNode*)pCol; -} - static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) { SNodeList* pFunc = pMergeWindow->pFuncs; pMergeWindow->pFuncs = NULL; @@ -527,11 +533,12 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo splSetParent((SLogicNode*)pPartWin); SNodeList* pFuncPart = NULL; + SNodeList* pFuncMid = NULL; SNodeList* pFuncMerge = NULL; - int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge); + int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMid, &pFuncMerge); pPartWin->pFuncs = pFuncPart; + pMidWin->pFuncs = pFuncMid; pMergeWindow->pFuncs = pFuncMerge; - pMidWin->pFuncs = nodesCloneList(pFuncPart); int32_t index = 0; if (TSDB_CODE_SUCCESS == code) { @@ -549,23 +556,6 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo } } - if(TSDB_CODE_SUCCESS == code){ - for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) { - SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i); - SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i); - NODES_DESTORY_LIST(pFunc2->pParameterList); - - SNodeList* pParameterList = NULL; - SNode* pRes = createColumnByFunc(pFunc1); - code = nodesListMakeStrictAppend(&pParameterList, pRes); - if(code == TSDB_CODE_SUCCESS){ - pFunc2->pParameterList = pParameterList; - }else{ - nodesDestroyNode(pRes); - } - } - } - if (TSDB_CODE_SUCCESS == code) { code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision); } @@ -978,7 +968,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO pPartAgg->node.pChildren = pChildren; splSetParent((SLogicNode*)pPartAgg); - code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); + code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs); } if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);