fix:partical func parameters for middle interval[checkStreamSTable1.sim]
This commit is contained in:
parent
8f8d8c578f
commit
eac86f72b3
|
@ -243,7 +243,7 @@ bool fmIsSkipScanCheckFunc(int32_t funcId);
|
||||||
void getLastCacheDataType(SDataType* pType);
|
void getLastCacheDataType(SDataType* pType);
|
||||||
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
|
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);
|
||||||
|
|
||||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
|
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);
|
||||||
|
|
||||||
typedef enum EFuncDataRequired {
|
typedef enum EFuncDataRequired {
|
||||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||||
|
|
|
@ -43,6 +43,7 @@ typedef struct SBuiltinFuncDefinition {
|
||||||
FExecProcess invertFunc;
|
FExecProcess invertFunc;
|
||||||
FExecCombine combineFunc;
|
FExecCombine combineFunc;
|
||||||
const char* pPartialFunc;
|
const char* pPartialFunc;
|
||||||
|
const char* pMiddleFunc;
|
||||||
const char* pMergeFunc;
|
const char* pMergeFunc;
|
||||||
FCreateMergeFuncParameters createMergeParaFuc;
|
FCreateMergeFuncParameters createMergeParaFuc;
|
||||||
FEstimateReturnRows estimateReturnRowsFunc;
|
FEstimateReturnRows estimateReturnRowsFunc;
|
||||||
|
|
|
@ -422,6 +422,20 @@ static int32_t translateAvgPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateAvgMiddle(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getAvgInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
@ -2485,6 +2499,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.invertFunc = avgInvertFunction,
|
.invertFunc = avgInvertFunction,
|
||||||
.combineFunc = avgCombine,
|
.combineFunc = avgCombine,
|
||||||
.pPartialFunc = "_avg_partial",
|
.pPartialFunc = "_avg_partial",
|
||||||
|
.pMiddleFunc = "_avg_middle",
|
||||||
.pMergeFunc = "_avg_merge"
|
.pMergeFunc = "_avg_merge"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -2500,6 +2515,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.invertFunc = avgInvertFunction,
|
.invertFunc = avgInvertFunction,
|
||||||
.combineFunc = avgCombine,
|
.combineFunc = avgCombine,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "_avg_middle",
|
||||||
|
.type = FUNCTION_TYPE_AVG_PARTIAL,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateAvgMiddle,
|
||||||
|
.dataRequiredFunc = statisDataRequired,
|
||||||
|
.getEnvFunc = getAvgFuncEnv,
|
||||||
|
.initFunc = avgFunctionSetup,
|
||||||
|
.processFunc = avgFunctionMerge,
|
||||||
|
.finalizeFunc = avgPartialFinalize,
|
||||||
|
.invertFunc = avgInvertFunction,
|
||||||
|
.combineFunc = avgCombine,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "_avg_merge",
|
.name = "_avg_merge",
|
||||||
.type = FUNCTION_TYPE_AVG_MERGE,
|
.type = FUNCTION_TYPE_AVG_MERGE,
|
||||||
|
|
|
@ -422,6 +422,35 @@ static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
|
||||||
|
SFunctionNode** pMidFunc) {
|
||||||
|
SNodeList* pParameterList = NULL;
|
||||||
|
SFunctionNode* pFunc = NULL;
|
||||||
|
|
||||||
|
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){
|
||||||
|
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList);
|
||||||
|
}else{
|
||||||
|
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
|
||||||
|
}
|
||||||
|
if (NULL == pFunc) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
strcpy(pFunc->node.aliasName, pPartialFunc->node.aliasName);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pMidFunc = pFunc;
|
||||||
|
} else {
|
||||||
|
nodesDestroyList(pParameterList);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
|
static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
|
||||||
SFunctionNode** pMergeFunc) {
|
SFunctionNode** pMergeFunc) {
|
||||||
SNodeList* pParameterList = NULL;
|
SNodeList* pParameterList = NULL;
|
||||||
|
@ -451,18 +480,22 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
|
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc) {
|
||||||
if (!fmIsDistExecFunc(pFunc->funcId)) {
|
if (!fmIsDistExecFunc(pFunc->funcId)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = createPartialFunction(pFunc, pPartialFunc);
|
int32_t code = createPartialFunction(pFunc, pPartialFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createMidFunction(pFunc, *pPartialFunc, pMidFunc);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
|
code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
nodesDestroyNode((SNode*)*pPartialFunc);
|
nodesDestroyNode((SNode*)*pPartialFunc);
|
||||||
|
nodesDestroyNode((SNode*)*pMidFunc);
|
||||||
nodesDestroyNode((SNode*)*pMergeFunc);
|
nodesDestroyNode((SNode*)*pMergeFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -344,11 +344,12 @@ static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SL
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
|
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) {
|
||||||
SNode* pNode = NULL;
|
SNode* pNode = NULL;
|
||||||
FOREACH(pNode, pFuncs) {
|
FOREACH(pNode, pFuncs) {
|
||||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||||
SFunctionNode* pPartFunc = NULL;
|
SFunctionNode* pPartFunc = NULL;
|
||||||
|
SFunctionNode* pMidFunc = NULL;
|
||||||
SFunctionNode* pMergeFunc = NULL;
|
SFunctionNode* pMergeFunc = NULL;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
|
if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
|
||||||
|
@ -359,18 +360,33 @@ static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFu
|
||||||
nodesDestroyNode((SNode*)pMergeFunc);
|
nodesDestroyNode((SNode*)pMergeFunc);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
if(pMidFuncs != NULL){
|
||||||
|
pMidFunc = (SFunctionNode*)nodesCloneNode(pNode);
|
||||||
|
if (NULL == pMidFunc) {
|
||||||
|
nodesDestroyNode((SNode*)pMidFunc);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
|
code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
|
code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if(pMidFuncs != NULL){
|
||||||
|
code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc);
|
||||||
|
}else{
|
||||||
|
nodesDestroyNode((SNode*)pMidFunc);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
|
code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
nodesDestroyList(*pPartialFuncs);
|
nodesDestroyNode((SNode*)pPartFunc);
|
||||||
nodesDestroyList(*pMergeFuncs);
|
nodesDestroyNode((SNode*)pMidFunc);
|
||||||
|
nodesDestroyNode((SNode*)pMergeFunc);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -463,7 +479,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
|
||||||
splSetParent((SLogicNode*)pPartWin);
|
splSetParent((SLogicNode*)pPartWin);
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
|
int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
||||||
}
|
}
|
||||||
|
@ -488,16 +504,6 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNode* createColumnByFunc(const SFunctionNode* pFunc) {
|
|
||||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
|
||||||
if (NULL == pCol) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
strcpy(pCol->colName, pFunc->node.aliasName);
|
|
||||||
pCol->node.resType = pFunc->node.resType;
|
|
||||||
return (SNode*)pCol;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
|
static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
|
||||||
SNodeList* pFunc = pMergeWindow->pFuncs;
|
SNodeList* pFunc = pMergeWindow->pFuncs;
|
||||||
pMergeWindow->pFuncs = NULL;
|
pMergeWindow->pFuncs = NULL;
|
||||||
|
@ -527,11 +533,12 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo
|
||||||
splSetParent((SLogicNode*)pPartWin);
|
splSetParent((SLogicNode*)pPartWin);
|
||||||
|
|
||||||
SNodeList* pFuncPart = NULL;
|
SNodeList* pFuncPart = NULL;
|
||||||
|
SNodeList* pFuncMid = NULL;
|
||||||
SNodeList* pFuncMerge = NULL;
|
SNodeList* pFuncMerge = NULL;
|
||||||
int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge);
|
int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMid, &pFuncMerge);
|
||||||
pPartWin->pFuncs = pFuncPart;
|
pPartWin->pFuncs = pFuncPart;
|
||||||
|
pMidWin->pFuncs = pFuncMid;
|
||||||
pMergeWindow->pFuncs = pFuncMerge;
|
pMergeWindow->pFuncs = pFuncMerge;
|
||||||
pMidWin->pFuncs = nodesCloneList(pFuncPart);
|
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -549,23 +556,6 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(TSDB_CODE_SUCCESS == code){
|
|
||||||
for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) {
|
|
||||||
SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i);
|
|
||||||
SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i);
|
|
||||||
NODES_DESTORY_LIST(pFunc2->pParameterList);
|
|
||||||
|
|
||||||
SNodeList* pParameterList = NULL;
|
|
||||||
SNode* pRes = createColumnByFunc(pFunc1);
|
|
||||||
code = nodesListMakeStrictAppend(&pParameterList, pRes);
|
|
||||||
if(code == TSDB_CODE_SUCCESS){
|
|
||||||
pFunc2->pParameterList = pParameterList;
|
|
||||||
}else{
|
|
||||||
nodesDestroyNode(pRes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
|
||||||
}
|
}
|
||||||
|
@ -978,7 +968,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
|
||||||
pPartAgg->node.pChildren = pChildren;
|
pPartAgg->node.pChildren = pChildren;
|
||||||
splSetParent((SLogicNode*)pPartAgg);
|
splSetParent((SLogicNode*)pPartAgg);
|
||||||
|
|
||||||
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
|
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
|
code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
|
||||||
|
|
Loading…
Reference in New Issue