From fa85d1caa79a8e1f93d521f76082f49f9c44839b Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 11 Mar 2024 11:29:08 +0000 Subject: [PATCH] support other tsma funcs --- include/libs/function/functionMgt.h | 7 ++ include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 2 + source/libs/executor/src/aggregateoperator.c | 14 +-- source/libs/function/inc/functionMgtInt.h | 1 + source/libs/function/src/builtins.c | 100 ++++++++++++++++-- source/libs/function/src/builtinsimpl.c | 17 +++ .../libs/function/src/detail/tavgfunction.c | 6 ++ source/libs/function/src/functionMgt.c | 8 ++ source/libs/nodes/src/nodesCloneFuncs.c | 2 + source/libs/nodes/src/nodesCodeFuncs.c | 23 ++++ source/libs/nodes/src/nodesMsgFuncs.c | 26 ++++- source/libs/parser/src/parTranslater.c | 4 + source/libs/planner/src/planOptimizer.c | 3 +- source/libs/planner/src/planPhysiCreater.c | 12 +++ tests/system-test/2-query/tsma.py | 9 +- 16 files changed, 210 insertions(+), 25 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index c7a5d2de65..cff5f3f790 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -170,6 +170,12 @@ typedef enum EFunctionType { FUNCTION_TYPE_FIRST_STATE_MERGE, FUNCTION_TYPE_LAST_STATE, FUNCTION_TYPE_LAST_STATE_MERGE, + FUNCTION_TYPE_SPREAD_STATE, + FUNCTION_TYPE_SPREAD_STATE_MERGE, + FUNCTION_TYPE_STDDEV_STATE, + FUNCTION_TYPE_STDDEV_STATE_MERGE, + FUNCTION_TYPE_HYPERLOGLOG_STATE, + FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE, // geometry functions FUNCTION_TYPE_GEOM_FROM_TEXT = 4250, @@ -282,6 +288,7 @@ int32_t fmCreateStateFuncs(SNodeList* pFuncs); int32_t fmCreateStateMergeFuncs(SNodeList* pFuncs); int32_t fmGetFuncId(const char* name); bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId); +bool fmIsCountLikeFunc(int32_t funcId); #ifdef __cplusplus } diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 7df13846c2..770b163293 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -583,6 +583,7 @@ typedef struct SAggPhysiNode { SNodeList* pAggFuncs; bool mergeDataBlock; bool groupKeyOptimized; + bool hasCountLikeFunc; } SAggPhysiNode; typedef struct SDownstreamSourceNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 602bfdde39..06eca9c105 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -177,6 +177,8 @@ typedef struct SFunctionNode { int32_t udfBufSize; bool hasPk; int32_t pkBytes; + bool hasOriginalFunc; + int32_t originalFuncId; } SFunctionNode; typedef struct STableNode { diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 715c354873..2429fcff79 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -47,6 +47,7 @@ typedef struct SAggOperatorInfo { bool groupKeyOptimized; bool hasValidBlock; SSDataBlock* pNewGroupBlock; + bool hasCountFunc; } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); @@ -111,6 +112,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pInfo->groupId = UINT64_MAX; pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; + pInfo->hasCountFunc = pAggNode->hasCountLikeFunc; setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -317,18 +319,8 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc } SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - bool hasCountFunc = false; - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; - if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) || - (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) { - hasCountFunc = true; - break; - } - } - - if (!hasCountFunc) { + if (!pAggInfo->hasCountFunc) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index 9c01d9320a..a3f97af5d9 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -56,6 +56,7 @@ extern "C" { #define FUNC_MGT_IGNORE_NULL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(27) #define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(28) #define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29) +#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index de8d674d20..4dd88eef2c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1823,6 +1823,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3 return translateFirstLastImpl(pFunc, pErrBuf, len, false); } +static int32_t translateFirstLastState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + int32_t paraBytes = getSDataTypeFromNode(pPara)->bytes; + + pFunc->node.resType = + (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateFirstLastStateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + int32_t paraBytes = getSDataTypeFromNode(pPara)->bytes; + uint8_t paraType = getSDataTypeFromNode(pPara)->type; + if (paraType != TSDB_DATA_TYPE_BINARY) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = + (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -2418,7 +2440,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "count", .type = FUNCTION_TYPE_COUNT, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_TSMA_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_COUNT_LIKE_FUNC, .translateFunc = translateCount, .dataRequiredFunc = countDataRequired, .getEnvFunc = getCountFuncEnv, @@ -2693,7 +2715,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "spread", .type = FUNCTION_TYPE_SPREAD, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC, .translateFunc = translateSpread, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getSpreadFuncEnv, @@ -2706,6 +2728,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { #endif .combineFunc = spreadCombine, .pPartialFunc = "_spread_partial", + .pStateFunc = "_spread_state", .pMergeFunc = "_spread_merge" }, { @@ -2737,6 +2760,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = NULL, #endif .combineFunc = spreadCombine, + .pPartialFunc = "_spread_state_merge", + .pMergeFunc = "_spread_merge", }, { .name = "elapsed", @@ -2919,6 +2944,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = firstLastScalarFunction, .finalizeFunc = firstLastFinalize, .pPartialFunc = "_first_partial", + .pStateFunc = "_first_state", .pMergeFunc = "_first_merge", .combineFunc = firstCombine, }, @@ -2946,12 +2972,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = firstFunctionMerge, .finalizeFunc = firstLastFinalize, .combineFunc = firstCombine, + .pPartialFunc = "_first_state_merge", + .pMergeFunc = "_first_merge", }, { .name = "last", .type = FUNCTION_TYPE_LAST, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC | FUNC_MGT_TSMA_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2960,6 +2988,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = firstLastScalarFunction, .finalizeFunc = firstLastFinalize, .pPartialFunc = "_last_partial", + .pStateFunc = "_last_state", .pMergeFunc = "_last_merge", .combineFunc = lastCombine, }, @@ -2987,6 +3016,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = lastFunctionMerge, .finalizeFunc = firstLastFinalize, .combineFunc = lastCombine, + .pPartialFunc = "_last_state_merge", + .pMergeFunc = "_last_merge", }, { .name = "twa", @@ -3049,7 +3080,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "hyperloglog", .type = FUNCTION_TYPE_HYPERLOGLOG, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC, .translateFunc = translateHLL, .getEnvFunc = getHLLFuncEnv, .initFunc = functionSetup, @@ -3880,13 +3911,70 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_FIRST_STATE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, - .translateFunc = translateAvgState, + .translateFunc = translateFirstLastState, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = firstFunction, .finalizeFunc = firstLastPartialFinalize, .pPartialFunc = "_first_partial", - .pMergeFunc = "_avg_state_merge" + .pMergeFunc = "_first_state_merge" + }, + { + .name = "_first_state_merge", + .type = FUNCTION_TYPE_FIRST_STATE_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateFirstLastStateMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunctionMerge, + .finalizeFunc = firstLastPartialFinalize, + }, + { + .name = "_last_state", + .type = FUNCTION_TYPE_LAST_STATE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateFirstLastState, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = firstLastPartialFinalize, + .pPartialFunc = "_last_partial", + .pMergeFunc = "_last_state_merge" + }, + { + .name = "_last_state_merge", + .type = FUNCTION_TYPE_LAST_STATE_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateFirstLastStateMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunctionMerge, + .finalizeFunc = firstLastPartialFinalize, + }, + { + .name = "_spread_state", + .type = FUNCTION_TYPE_SPREAD_STATE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateSpreadState, + .getEnvFunc = getSpreadFuncEnv, + .initFunc = spreadFunctionSetup, + .processFunc = spreadFunction, + .finalizeFunc = spreadPartialFinalize, + .pPartialFunc = "_spread_partial", + .pMergeFunc = "_last_state_merge" + }, + { + .name = "_spread_state_merge", + .type = FUNCTION_TYPE_SPREAD_STATE_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateSpreadStateMerge, + .getEnvFunc = getSpreadFuncEnv, + .initFunc = spreadFunctionSetup, + .processFunc = spreadFunctionMerge, + .finalizeFunc = spreadPartialFinalize, }, }; // clang-format on diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 36365abafb..d7ef445f9d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -867,6 +867,12 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) { } _sum_over: + if (numOfElem == 0) { + if (pCtx->pExpr->pExpr->_function.pFunctNode->hasOriginalFunc && + fmIsCountLikeFunc(pCtx->pExpr->pExpr->_function.pFunctNode->originalFuncId)) { + numOfElem = 1; + } + } // data in the check operation are all null, not output SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); return TSDB_CODE_SUCCESS; @@ -2846,6 +2852,12 @@ static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; + + if (IS_NULL_TYPE(pCol->info.type)) { + SET_VAL(GET_RES_INFO(pCtx), 0, 1); + return TSDB_CODE_SUCCESS; + } + if (pCol->info.type != TSDB_DATA_TYPE_BINARY) { return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } @@ -3852,6 +3864,11 @@ static void spreadTransferInfo(SSpreadInfo* pInput, SSpreadInfo* pOutput) { int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; + + if (IS_NULL_TYPE(pCol->info.type)) { + return TSDB_CODE_SUCCESS; + } + if (pCol->info.type != TSDB_DATA_TYPE_BINARY) { return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index 3f3c4a1c99..66ed092f76 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -705,6 +705,12 @@ static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; + + if (IS_NULL_TYPE(pCol->info.type)) { + SET_VAL(GET_RES_INFO(pCtx), 0, 1); + return TSDB_CODE_SUCCESS; + } + if (pCol->info.type != TSDB_DATA_TYPE_BINARY) { return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 8d5342620f..1e12595b28 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -413,6 +413,8 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod nodesDestroyList(pParameterList); return TSDB_CODE_OUT_OF_MEMORY; } + (*pPartialFunc)->hasOriginalFunc = true; + (*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId; char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0}; int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc); taosCreateMD5Hash(name, len); @@ -475,6 +477,8 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio } } if (TSDB_CODE_SUCCESS == code) { + pFunc->hasOriginalFunc = true; + pFunc->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId; // overwrite function restype set by translate function if (fmIsSameInOutType(pSrcFunc->funcId)) { pFunc->node.resType = pSrcFunc->node.resType; @@ -635,3 +639,7 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) { const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId]; return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0; } + +bool fmIsCountLikeFunc(int32_t funcId) { + return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC); +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 4c25f60294..961c62160b 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -224,6 +224,8 @@ static int32_t functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) COPY_SCALAR_FIELD(udfBufSize); COPY_SCALAR_FIELD(hasPk); COPY_SCALAR_FIELD(pkBytes); + COPY_SCALAR_FIELD(hasOriginalFunc); + COPY_SCALAR_FIELD(originalFuncId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6d208f258b..7bd8ec4234 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2468,6 +2468,7 @@ static const char* jkAggPhysiPlanGroupKeys = "GroupKeys"; static const char* jkAggPhysiPlanAggFuncs = "AggFuncs"; static const char* jkAggPhysiPlanMergeDataBlock = "MergeDataBlock"; static const char* jkAggPhysiPlanGroupKeyOptimized = "GroupKeyOptimized"; +static const char* jkAggPhysiPlanHasCountLikeFunc = "HasCountFunc"; static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) { const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj; @@ -2488,6 +2489,9 @@ static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanGroupKeyOptimized, pNode->groupKeyOptimized); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanHasCountLikeFunc, pNode->hasCountLikeFunc); + } return code; } @@ -2511,6 +2515,9 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkAggPhysiPlanGroupKeyOptimized, &pNode->groupKeyOptimized); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkAggPhysiPlanHasCountLikeFunc, &pNode->hasCountLikeFunc); + } return code; } @@ -4238,6 +4245,8 @@ static const char* jkFunctionParameter = "Parameters"; static const char* jkFunctionUdfBufSize = "UdfBufSize"; static const char* jkFunctionHasPk = "HasPk"; static const char* jkFunctionPkBytes = "PkBytes"; +static const char* jkFunctionIsMergeFunc = "IsMergeFunc"; +static const char* jkFunctionMergeFuncOf = "MergeFuncOf"; static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { const SFunctionNode* pNode = (const SFunctionNode*)pObj; @@ -4264,6 +4273,13 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkFunctionPkBytes, pNode->pkBytes); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkFunctionIsMergeFunc, pNode->hasOriginalFunc); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkFunctionMergeFuncOf, pNode->originalFuncId); + } + return code; } @@ -4292,6 +4308,13 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkFunctionPkBytes, &pNode->pkBytes); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkFunctionIsMergeFunc, &pNode->hasOriginalFunc); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkFunctionMergeFuncOf, &pNode->originalFuncId); + } + return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 0c6adce23c..54cf685235 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -1109,7 +1109,9 @@ enum { FUNCTION_CODE_PARAMETERS, FUNCTION_CODE_UDF_BUF_SIZE, FUNCTION_NODE_HAS_PK, - FUNCTION_NODE_PK_BYTES + FUNCTION_NODE_PK_BYTES, + FUNCTION_CODE_IS_MERGE_FUNC, + FUNCTION_CODE_MERGE_FUNC_OF, }; static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -1137,6 +1139,13 @@ static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, FUNCTION_NODE_PK_BYTES, pNode->pkBytes); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, FUNCTION_CODE_IS_MERGE_FUNC, pNode->hasOriginalFunc); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, FUNCTION_CODE_MERGE_FUNC_OF, pNode->originalFuncId); + } + return code; } @@ -1171,6 +1180,12 @@ static int32_t msgToFunctionNode(STlvDecoder* pDecoder, void* pObj) { case FUNCTION_NODE_PK_BYTES: code = tlvDecodeI32(pTlv, &pNode->pkBytes); break; + case FUNCTION_CODE_IS_MERGE_FUNC: + code = tlvDecodeBool(pTlv, &pNode->hasOriginalFunc); + break; + case FUNCTION_CODE_MERGE_FUNC_OF: + code = tlvDecodeI32(pTlv, &pNode->originalFuncId); + break; default: break; } @@ -2839,7 +2854,8 @@ enum { PHY_AGG_CODE_GROUP_KEYS, PHY_AGG_CODE_AGG_FUNCS, PHY_AGG_CODE_MERGE_DATA_BLOCK, - PHY_AGG_CODE_GROUP_KEY_OPTIMIZE + PHY_AGG_CODE_GROUP_KEY_OPTIMIZE, + PHY_AGG_CODE_HAS_COUNT_LIKE_FUNCS, }; static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2861,6 +2877,9 @@ static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_GROUP_KEY_OPTIMIZE, pNode->groupKeyOptimized); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_HAS_COUNT_LIKE_FUNCS, pNode->hasCountLikeFunc); + } return code; } @@ -2890,6 +2909,9 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) { case PHY_AGG_CODE_GROUP_KEY_OPTIMIZE: code = tlvDecodeBool(pTlv, &pNode->groupKeyOptimized); break; + case PHY_AGG_CODE_HAS_COUNT_LIKE_FUNCS: + code = tlvDecodeBool(pTlv, &pNode->hasCountLikeFunc); + break; default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 98c1d740dc..4f1ff56879 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10686,6 +10686,10 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, nodesListErase(pFunc->pParameterList, pFunc->pParameterList->pHead); nodesListPushFront(pFunc->pParameterList, (SNode*)pCol); snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s", pSchema->name); + // for first or last, the second param will be pk ts col, here we should remove it + if (fmIsImplicitTsFunc(pFunc->funcId) && LIST_LENGTH(pFunc->pParameterList) == 2) { + nodesListErase(pFunc->pParameterList, pFunc->pParameterList->pTail); + } ++i; } // recursive tsma, create func list from base tsma diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4882c8517c..d5f7f51e81 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5895,8 +5895,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ SFunctionNode* pQueryFunc = (SFunctionNode*)pNode; // TODO handle _wstart if (fmIsPseudoColumnFunc(pQueryFunc->funcId) || fmIsGroupKeyFunc(pQueryFunc->funcId)) continue; - if (1 != pQueryFunc->pParameterList->length || - nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) { + if (nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) { failed = true; break; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index fe23025146..e7e0be6d57 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1556,6 +1556,17 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi return code; } +static EDealRes hasCountLikeFunc(SNode* pNode, void* res) { + if (QUERY_NODE_FUNCTION == nodeType(pNode)) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) { + *(bool*)res = true; + return DEAL_RES_END; + } + } + return DEAL_RES_CONTINUE; +} + static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode, SSubplan* pSubPlan) { SAggPhysiNode* pAgg = @@ -1579,6 +1590,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, if (TSDB_CODE_SUCCESS == code) { code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs); } + nodesWalkExprs(pAggFuncs, hasCountLikeFunc, &pAgg->hasCountLikeFunc); SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index e9933ea3a1..4fcefb5cb0 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -229,12 +229,12 @@ class TSMATester: key=lambda x: [v is None for v in x] + list(x)) tsma_res.sort(key=lambda x: [v is None for v in x] + list(x)) except Exception as e: - tdLog.exit("comparing tsma res for: %s got different data: \nno tsma res: %s \n tsma res: %s err: %s" % ( + tdLog.exit("comparing tsma res for: %s got different data: \nno tsma res: %s \n tsma res: %s err: %s" % ( sql, str(no_tsma_res), str(tsma_res), str(e))) for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res): if row_no_tsma != row_tsma: - tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % ( + tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % ( sql, str(row_no_tsma), str(row_tsma), str(no_tsma_res), str(tsma_res))) tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' % (sql, str(tsma_res), str(no_tsma_res))) @@ -771,7 +771,7 @@ class TDTestCase: def test_recursive_tsma(self): tdSql.execute('drop tsma tsma2') - tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)'] + tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)'] select_func_list: List[str] = tsma_func_list.copy() select_func_list.append('count(*)') self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m') @@ -781,6 +781,7 @@ class TDTestCase: sql = 'select avg(c2), "recursive tsma4" from meters' ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma( 'tsma4', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc() + #time.sleep(999999) self.tsma_tester.check_sql(sql, ctx) self.check(self.test_query_tsma_all(select_func_list)) self.create_recursive_tsma( @@ -857,7 +858,7 @@ class TDTestCase: sql = "SELECT avg(c1), avg(c2),_wstart, _wend,t3,t4,t5,t2 FROM meters WHERE ts >= '2018-09-17 8:00:00' AND ts < '2018-09-17 09:03:18.334' PARTITION BY t3,t4,t5,t2 INTERVAL(1d);" ctxs.append(TSMAQCBuilder().with_sql(sql) .should_query_with_table('meters', '2018-09-17 8:00:00', '2018-09-17 09:03:18.333').get_qc()) - ctxs.extend(self.test_query_tsma_all()) + #ctxs.extend(self.test_query_tsma_all()) return ctxs def test_query_with_tsma_interval_partition_by_col(self):