support other tsma funcs

This commit is contained in:
wangjiaming0909 2024-03-11 11:29:08 +00:00
parent cc9550f45e
commit fa85d1caa7
16 changed files with 210 additions and 25 deletions

View File

@ -170,6 +170,12 @@ typedef enum EFunctionType {
FUNCTION_TYPE_FIRST_STATE_MERGE,
FUNCTION_TYPE_LAST_STATE,
FUNCTION_TYPE_LAST_STATE_MERGE,
FUNCTION_TYPE_SPREAD_STATE,
FUNCTION_TYPE_SPREAD_STATE_MERGE,
FUNCTION_TYPE_STDDEV_STATE,
FUNCTION_TYPE_STDDEV_STATE_MERGE,
FUNCTION_TYPE_HYPERLOGLOG_STATE,
FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE,
// geometry functions
FUNCTION_TYPE_GEOM_FROM_TEXT = 4250,
@ -282,6 +288,7 @@ int32_t fmCreateStateFuncs(SNodeList* pFuncs);
int32_t fmCreateStateMergeFuncs(SNodeList* pFuncs);
int32_t fmGetFuncId(const char* name);
bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId);
bool fmIsCountLikeFunc(int32_t funcId);
#ifdef __cplusplus
}

View File

@ -583,6 +583,7 @@ typedef struct SAggPhysiNode {
SNodeList* pAggFuncs;
bool mergeDataBlock;
bool groupKeyOptimized;
bool hasCountLikeFunc;
} SAggPhysiNode;
typedef struct SDownstreamSourceNode {

View File

@ -177,6 +177,8 @@ typedef struct SFunctionNode {
int32_t udfBufSize;
bool hasPk;
int32_t pkBytes;
bool hasOriginalFunc;
int32_t originalFuncId;
} SFunctionNode;
typedef struct STableNode {

View File

@ -47,6 +47,7 @@ typedef struct SAggOperatorInfo {
bool groupKeyOptimized;
bool hasValidBlock;
SSDataBlock* pNewGroupBlock;
bool hasCountFunc;
} SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param);
@ -111,6 +112,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pInfo->groupId = UINT64_MAX;
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
!pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -317,18 +319,8 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
}
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
bool hasCountFunc = false;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
(strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
hasCountFunc = true;
break;
}
}
if (!hasCountFunc) {
if (!pAggInfo->hasCountFunc) {
return TSDB_CODE_SUCCESS;
}

View File

@ -56,6 +56,7 @@ extern "C" {
#define FUNC_MGT_IGNORE_NULL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(27)
#define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(28)
#define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29)
#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -1823,6 +1823,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3
return translateFirstLastImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateFirstLastState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
int32_t paraBytes = getSDataTypeFromNode(pPara)->bytes;
pFunc->node.resType =
(SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
static int32_t translateFirstLastStateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
int32_t paraBytes = getSDataTypeFromNode(pPara)->bytes;
uint8_t paraType = getSDataTypeFromNode(pPara)->type;
if (paraType != TSDB_DATA_TYPE_BINARY) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType =
(SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
@ -2418,7 +2440,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "count",
.type = FUNCTION_TYPE_COUNT,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_TSMA_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_TSMA_FUNC | FUNC_MGT_COUNT_LIKE_FUNC,
.translateFunc = translateCount,
.dataRequiredFunc = countDataRequired,
.getEnvFunc = getCountFuncEnv,
@ -2693,7 +2715,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "spread",
.type = FUNCTION_TYPE_SPREAD,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpread,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getSpreadFuncEnv,
@ -2706,6 +2728,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
#endif
.combineFunc = spreadCombine,
.pPartialFunc = "_spread_partial",
.pStateFunc = "_spread_state",
.pMergeFunc = "_spread_merge"
},
{
@ -2737,6 +2760,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.invertFunc = NULL,
#endif
.combineFunc = spreadCombine,
.pPartialFunc = "_spread_state_merge",
.pMergeFunc = "_spread_merge",
},
{
.name = "elapsed",
@ -2919,6 +2944,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = firstLastScalarFunction,
.finalizeFunc = firstLastFinalize,
.pPartialFunc = "_first_partial",
.pStateFunc = "_first_state",
.pMergeFunc = "_first_merge",
.combineFunc = firstCombine,
},
@ -2946,12 +2972,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = firstFunctionMerge,
.finalizeFunc = firstLastFinalize,
.combineFunc = firstCombine,
.pPartialFunc = "_first_state_merge",
.pMergeFunc = "_first_merge",
},
{
.name = "last",
.type = FUNCTION_TYPE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_IGNORE_NULL_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateFirstLast,
.dynDataRequiredFunc = lastDynDataReq,
.getEnvFunc = getFirstLastFuncEnv,
@ -2960,6 +2988,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = firstLastScalarFunction,
.finalizeFunc = firstLastFinalize,
.pPartialFunc = "_last_partial",
.pStateFunc = "_last_state",
.pMergeFunc = "_last_merge",
.combineFunc = lastCombine,
},
@ -2987,6 +3016,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine,
.pPartialFunc = "_last_state_merge",
.pMergeFunc = "_last_merge",
},
{
.name = "twa",
@ -3049,7 +3080,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "hyperloglog",
.type = FUNCTION_TYPE_HYPERLOGLOG,
.classification = FUNC_MGT_AGG_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC,
.translateFunc = translateHLL,
.getEnvFunc = getHLLFuncEnv,
.initFunc = functionSetup,
@ -3880,13 +3911,70 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_FIRST_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateAvgState,
.translateFunc = translateFirstLastState,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = firstLastPartialFinalize,
.pPartialFunc = "_first_partial",
.pMergeFunc = "_avg_state_merge"
.pMergeFunc = "_first_state_merge"
},
{
.name = "_first_state_merge",
.type = FUNCTION_TYPE_FIRST_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateFirstLastStateMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunctionMerge,
.finalizeFunc = firstLastPartialFinalize,
},
{
.name = "_last_state",
.type = FUNCTION_TYPE_LAST_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateFirstLastState,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = firstLastPartialFinalize,
.pPartialFunc = "_last_partial",
.pMergeFunc = "_last_state_merge"
},
{
.name = "_last_state_merge",
.type = FUNCTION_TYPE_LAST_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateFirstLastStateMerge,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastPartialFinalize,
},
{
.name = "_spread_state",
.type = FUNCTION_TYPE_SPREAD_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpreadState,
.getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunction,
.finalizeFunc = spreadPartialFinalize,
.pPartialFunc = "_spread_partial",
.pMergeFunc = "_last_state_merge"
},
{
.name = "_spread_state_merge",
.type = FUNCTION_TYPE_SPREAD_STATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateSpreadStateMerge,
.getEnvFunc = getSpreadFuncEnv,
.initFunc = spreadFunctionSetup,
.processFunc = spreadFunctionMerge,
.finalizeFunc = spreadPartialFinalize,
},
};
// clang-format on

View File

@ -867,6 +867,12 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) {
}
_sum_over:
if (numOfElem == 0) {
if (pCtx->pExpr->pExpr->_function.pFunctNode->hasOriginalFunc &&
fmIsCountLikeFunc(pCtx->pExpr->pExpr->_function.pFunctNode->originalFuncId)) {
numOfElem = 1;
}
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
@ -2846,6 +2852,12 @@ static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (IS_NULL_TYPE(pCol->info.type)) {
SET_VAL(GET_RES_INFO(pCtx), 0, 1);
return TSDB_CODE_SUCCESS;
}
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}
@ -3852,6 +3864,11 @@ static void spreadTransferInfo(SSpreadInfo* pInput, SSpreadInfo* pOutput) {
int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (IS_NULL_TYPE(pCol->info.type)) {
return TSDB_CODE_SUCCESS;
}
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}

View File

@ -705,6 +705,12 @@ static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (IS_NULL_TYPE(pCol->info.type)) {
SET_VAL(GET_RES_INFO(pCtx), 0, 1);
return TSDB_CODE_SUCCESS;
}
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}

View File

@ -413,6 +413,8 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pPartialFunc)->hasOriginalFunc = true;
(*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
taosCreateMD5Hash(name, len);
@ -475,6 +477,8 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
}
}
if (TSDB_CODE_SUCCESS == code) {
pFunc->hasOriginalFunc = true;
pFunc->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
// overwrite function restype set by translate function
if (fmIsSameInOutType(pSrcFunc->funcId)) {
pFunc->node.resType = pSrcFunc->node.resType;
@ -635,3 +639,7 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId];
return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0;
}
bool fmIsCountLikeFunc(int32_t funcId) {
return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC);
}

View File

@ -224,6 +224,8 @@ static int32_t functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst)
COPY_SCALAR_FIELD(udfBufSize);
COPY_SCALAR_FIELD(hasPk);
COPY_SCALAR_FIELD(pkBytes);
COPY_SCALAR_FIELD(hasOriginalFunc);
COPY_SCALAR_FIELD(originalFuncId);
return TSDB_CODE_SUCCESS;
}

View File

@ -2468,6 +2468,7 @@ static const char* jkAggPhysiPlanGroupKeys = "GroupKeys";
static const char* jkAggPhysiPlanAggFuncs = "AggFuncs";
static const char* jkAggPhysiPlanMergeDataBlock = "MergeDataBlock";
static const char* jkAggPhysiPlanGroupKeyOptimized = "GroupKeyOptimized";
static const char* jkAggPhysiPlanHasCountLikeFunc = "HasCountFunc";
static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj;
@ -2488,6 +2489,9 @@ static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanGroupKeyOptimized, pNode->groupKeyOptimized);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkAggPhysiPlanHasCountLikeFunc, pNode->hasCountLikeFunc);
}
return code;
}
@ -2511,6 +2515,9 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkAggPhysiPlanGroupKeyOptimized, &pNode->groupKeyOptimized);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkAggPhysiPlanHasCountLikeFunc, &pNode->hasCountLikeFunc);
}
return code;
}
@ -4238,6 +4245,8 @@ static const char* jkFunctionParameter = "Parameters";
static const char* jkFunctionUdfBufSize = "UdfBufSize";
static const char* jkFunctionHasPk = "HasPk";
static const char* jkFunctionPkBytes = "PkBytes";
static const char* jkFunctionIsMergeFunc = "IsMergeFunc";
static const char* jkFunctionMergeFuncOf = "MergeFuncOf";
static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
const SFunctionNode* pNode = (const SFunctionNode*)pObj;
@ -4264,6 +4273,13 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFunctionPkBytes, pNode->pkBytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkFunctionIsMergeFunc, pNode->hasOriginalFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkFunctionMergeFuncOf, pNode->originalFuncId);
}
return code;
}
@ -4292,6 +4308,13 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkFunctionPkBytes, &pNode->pkBytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkFunctionIsMergeFunc, &pNode->hasOriginalFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkFunctionMergeFuncOf, &pNode->originalFuncId);
}
return code;
}

View File

@ -1109,7 +1109,9 @@ enum {
FUNCTION_CODE_PARAMETERS,
FUNCTION_CODE_UDF_BUF_SIZE,
FUNCTION_NODE_HAS_PK,
FUNCTION_NODE_PK_BYTES
FUNCTION_NODE_PK_BYTES,
FUNCTION_CODE_IS_MERGE_FUNC,
FUNCTION_CODE_MERGE_FUNC_OF,
};
static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -1137,6 +1139,13 @@ static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, FUNCTION_NODE_PK_BYTES, pNode->pkBytes);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, FUNCTION_CODE_IS_MERGE_FUNC, pNode->hasOriginalFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, FUNCTION_CODE_MERGE_FUNC_OF, pNode->originalFuncId);
}
return code;
}
@ -1171,6 +1180,12 @@ static int32_t msgToFunctionNode(STlvDecoder* pDecoder, void* pObj) {
case FUNCTION_NODE_PK_BYTES:
code = tlvDecodeI32(pTlv, &pNode->pkBytes);
break;
case FUNCTION_CODE_IS_MERGE_FUNC:
code = tlvDecodeBool(pTlv, &pNode->hasOriginalFunc);
break;
case FUNCTION_CODE_MERGE_FUNC_OF:
code = tlvDecodeI32(pTlv, &pNode->originalFuncId);
break;
default:
break;
}
@ -2839,7 +2854,8 @@ enum {
PHY_AGG_CODE_GROUP_KEYS,
PHY_AGG_CODE_AGG_FUNCS,
PHY_AGG_CODE_MERGE_DATA_BLOCK,
PHY_AGG_CODE_GROUP_KEY_OPTIMIZE
PHY_AGG_CODE_GROUP_KEY_OPTIMIZE,
PHY_AGG_CODE_HAS_COUNT_LIKE_FUNCS,
};
static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2861,6 +2877,9 @@ static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_GROUP_KEY_OPTIMIZE, pNode->groupKeyOptimized);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_HAS_COUNT_LIKE_FUNCS, pNode->hasCountLikeFunc);
}
return code;
}
@ -2890,6 +2909,9 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_AGG_CODE_GROUP_KEY_OPTIMIZE:
code = tlvDecodeBool(pTlv, &pNode->groupKeyOptimized);
break;
case PHY_AGG_CODE_HAS_COUNT_LIKE_FUNCS:
code = tlvDecodeBool(pTlv, &pNode->hasCountLikeFunc);
break;
default:
break;
}

View File

@ -10686,6 +10686,10 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt,
nodesListErase(pFunc->pParameterList, pFunc->pParameterList->pHead);
nodesListPushFront(pFunc->pParameterList, (SNode*)pCol);
snprintf(pFunc->node.userAlias, TSDB_COL_NAME_LEN, "%s", pSchema->name);
// for first or last, the second param will be pk ts col, here we should remove it
if (fmIsImplicitTsFunc(pFunc->funcId) && LIST_LENGTH(pFunc->pParameterList) == 2) {
nodesListErase(pFunc->pParameterList, pFunc->pParameterList->pTail);
}
++i;
}
// recursive tsma, create func list from base tsma

View File

@ -5895,8 +5895,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
// TODO handle _wstart
if (fmIsPseudoColumnFunc(pQueryFunc->funcId) || fmIsGroupKeyFunc(pQueryFunc->funcId)) continue;
if (1 != pQueryFunc->pParameterList->length ||
nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
if (nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
failed = true;
break;
}

View File

@ -1556,6 +1556,17 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi
return code;
}
static EDealRes hasCountLikeFunc(SNode* pNode, void* res) {
if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
*(bool*)res = true;
return DEAL_RES_END;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
SAggPhysiNode* pAgg =
@ -1579,6 +1590,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
if (TSDB_CODE_SUCCESS == code) {
code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
}
nodesWalkExprs(pAggFuncs, hasCountLikeFunc, &pAgg->hasCountLikeFunc);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node

View File

@ -229,12 +229,12 @@ class TSMATester:
key=lambda x: [v is None for v in x] + list(x))
tsma_res.sort(key=lambda x: [v is None for v in x] + list(x))
except Exception as e:
tdLog.exit("comparing tsma res for: %s got different data: \nno tsma res: %s \n tsma res: %s err: %s" % (
tdLog.exit("comparing tsma res for: %s got different data: \nno tsma res: %s \n tsma res: %s err: %s" % (
sql, str(no_tsma_res), str(tsma_res), str(e)))
for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res):
if row_no_tsma != row_tsma:
tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % (
tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % (
sql, str(row_no_tsma), str(row_tsma), str(no_tsma_res), str(tsma_res)))
tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' %
(sql, str(tsma_res), str(no_tsma_res)))
@ -771,7 +771,7 @@ class TDTestCase:
def test_recursive_tsma(self):
tdSql.execute('drop tsma tsma2')
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)']
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)']
select_func_list: List[str] = tsma_func_list.copy()
select_func_list.append('count(*)')
self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m')
@ -781,6 +781,7 @@ class TDTestCase:
sql = 'select avg(c2), "recursive tsma4" from meters'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
'tsma4', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
#time.sleep(999999)
self.tsma_tester.check_sql(sql, ctx)
self.check(self.test_query_tsma_all(select_func_list))
self.create_recursive_tsma(
@ -857,7 +858,7 @@ class TDTestCase:
sql = "SELECT avg(c1), avg(c2),_wstart, _wend,t3,t4,t5,t2 FROM meters WHERE ts >= '2018-09-17 8:00:00' AND ts < '2018-09-17 09:03:18.334' PARTITION BY t3,t4,t5,t2 INTERVAL(1d);"
ctxs.append(TSMAQCBuilder().with_sql(sql)
.should_query_with_table('meters', '2018-09-17 8:00:00', '2018-09-17 09:03:18.333').get_qc())
ctxs.extend(self.test_query_tsma_all())
#ctxs.extend(self.test_query_tsma_all())
return ctxs
def test_query_with_tsma_interval_partition_by_col(self):