diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e1b2fa0b05..7a24b8fb21 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -507,6 +507,35 @@ static int32_t translateStddevMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t return TSDB_CODE_SUCCESS; } +static int32_t translateStddevState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type; + if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateStddevStateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type; + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + + return TSDB_CODE_SUCCESS; +} + static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT, @@ -1361,6 +1390,24 @@ static int32_t translateHLLMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t le return translateHLLImpl(pFunc, pErrBuf, len, false); } +static int32_t translateHLLState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateHLLPartial(pFunc, pErrBuf, len); +} + +static int32_t translateHLLStateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + if (getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type != TSDB_DATA_TYPE_BINARY) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = + (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + static bool validateStateOper(const SValueNode* pVal) { if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) { return false; @@ -1865,8 +1912,7 @@ static int32_t translateFirstLastStateMerge(SFunctionNode* pFunc, char* pErrBuf, return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = - (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + pFunc->node.resType = (SDataType){.bytes = paraBytes, .type = TSDB_DATA_TYPE_BINARY}; return TSDB_CODE_SUCCESS; } @@ -2535,7 +2581,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "stddev", .type = FUNCTION_TYPE_STDDEV, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, .translateFunc = translateInNumOutDou, .getEnvFunc = getStddevFuncEnv, .initFunc = stddevFunctionSetup, @@ -2547,6 +2593,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { #endif .combineFunc = stddevCombine, .pPartialFunc = "_stddev_partial", + .pStateFunc = "_stddev_state", .pMergeFunc = "_stddev_merge" }, { @@ -2576,6 +2623,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = stddevInvertFunction, #endif .combineFunc = stddevCombine, + .pPartialFunc = "_stddev_state_merge", + .pMergeFunc = "_stddev_merge", }, { .name = "leastsquares", @@ -3105,7 +3154,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "hyperloglog", .type = FUNCTION_TYPE_HYPERLOGLOG, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC, .translateFunc = translateHLL, .getEnvFunc = getHLLFuncEnv, .initFunc = functionSetup, @@ -3117,6 +3166,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { #endif .combineFunc = hllCombine, .pPartialFunc = "_hyperloglog_partial", + .pStateFunc = "_hyperloglog_state", .pMergeFunc = "_hyperloglog_merge" }, { @@ -3146,6 +3196,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = NULL, #endif .combineFunc = hllCombine, + .pPartialFunc = "_hyperloglog_state_merge", + .pMergeFunc = "_hyperloglog_merge", }, { .name = "diff", @@ -3906,6 +3958,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = qPseudoTagFunction, .finalizeFunc = NULL }, + { + .name = "_stddev_state", + .type = FUNCTION_TYPE_STDDEV_STATE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateStddevState, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunction, + .finalizeFunc = stddevPartialFinalize, + .pPartialFunc = "_stddev_partial", + .pMergeFunc = "_stddev_state_merge", + }, + { + .name = "_stddev_state_merge", + .type = FUNCTION_TYPE_STDDEV_STATE_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateStddevStateMerge, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunctionMerge, + .finalizeFunc = stddevPartialFinalize, + }, { //TODO for outer use not only internal .name = "_avg_state", @@ -3931,6 +4005,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = avgFunctionMerge, .finalizeFunc = avgPartialFinalize, }, + { + .name = "_spread_state", + .type = FUNCTION_TYPE_SPREAD_STATE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateSpreadState, + .getEnvFunc = getSpreadFuncEnv, + .initFunc = spreadFunctionSetup, + .processFunc = spreadFunction, + .finalizeFunc = spreadPartialFinalize, + .pPartialFunc = "_spread_partial", + .pMergeFunc = "_spread_state_merge" + }, + { + .name = "_spread_state_merge", + .type = FUNCTION_TYPE_SPREAD_STATE_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateSpreadStateMerge, + .getEnvFunc = getSpreadFuncEnv, + .initFunc = spreadFunctionSetup, + .processFunc = spreadFunctionMerge, + .finalizeFunc = spreadPartialFinalize, + }, { .name = "_first_state", .type = FUNCTION_TYPE_FIRST_STATE, @@ -3980,26 +4076,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = firstLastPartialFinalize, }, { - .name = "_spread_state", - .type = FUNCTION_TYPE_SPREAD_STATE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_TSMA_FUNC, - .translateFunc = translateSpreadState, - .getEnvFunc = getSpreadFuncEnv, - .initFunc = spreadFunctionSetup, - .processFunc = spreadFunction, - .finalizeFunc = spreadPartialFinalize, - .pPartialFunc = "_spread_partial", - .pMergeFunc = "_spread_state_merge" + .name = "_hyperloglog_state", + .type = FUNCTION_TYPE_HYPERLOGLOG_STATE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateHLLState, + .getEnvFunc = getHLLFuncEnv, + .initFunc = functionSetup, + .processFunc = hllFunction, + .finalizeFunc = hllPartialFinalize, + .pPartialFunc = "_hyperloglog_partial", + .pMergeFunc = "_hyperloglog_state_merge", }, { - .name = "_spread_state_merge", - .type = FUNCTION_TYPE_SPREAD_STATE_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC, - .translateFunc = translateSpreadStateMerge, - .getEnvFunc = getSpreadFuncEnv, - .initFunc = spreadFunctionSetup, - .processFunc = spreadFunctionMerge, - .finalizeFunc = spreadPartialFinalize, + .name = "_hyperloglog_state_merge", + .type = FUNCTION_TYPE_HYPERLOGLOG_STATE_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_COUNT_LIKE_FUNC | FUNC_MGT_TSMA_FUNC, + .translateFunc = translateHLLStateMerge, + .getEnvFunc = getHLLFuncEnv, + .initFunc = functionSetup, + .processFunc = hllFunctionMerge, + .finalizeFunc = hllPartialFinalize, }, }; // clang-format on diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d2db33573b..1c329a83fb 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1454,6 +1454,12 @@ static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) { int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; + + if (IS_NULL_TYPE(pCol->info.type)) { + SET_VAL(GET_RES_INFO(pCtx), 0, 1); + return TSDB_CODE_SUCCESS; + } + if (pCol->info.type != TSDB_DATA_TYPE_BINARY) { return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } @@ -1461,6 +1467,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (colDataIsNull_s(pCol, i)) continue; char* data = colDataGetData(pCol, i); SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); stddevTransferInfo(pInputInfo, pInfo); @@ -3866,6 +3873,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) { SColumnInfoData* pCol = pInput->pData[0]; if (IS_NULL_TYPE(pCol->info.type)) { + SET_VAL(GET_RES_INFO(pCtx), 0, 1); return TSDB_CODE_SUCCESS; } @@ -4606,6 +4614,11 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; + if (IS_NULL_TYPE(pCol->info.type)) { + SET_VAL(GET_RES_INFO(pCtx), 0, 1); + return TSDB_CODE_SUCCESS; + } + if (pCol->info.type != TSDB_DATA_TYPE_BINARY) { return TSDB_CODE_SUCCESS; } @@ -4615,6 +4628,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; for (int32_t i = start; i < start + pInput->numOfRows; ++i) { + if (colDataIsNull_s(pCol, i)) continue; char* data = colDataGetData(pCol, i); SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data); hllTransferInfo(pInputInfo, pInfo); diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 9fa4d819d1..1e43fb6ec5 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -771,7 +771,7 @@ class TDTestCase: def test_recursive_tsma(self): tdSql.execute('drop tsma tsma2') - tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)'] + tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)'] select_func_list: List[str] = tsma_func_list.copy() select_func_list.append('count(*)') self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m') @@ -858,7 +858,7 @@ class TDTestCase: sql = "SELECT avg(c1), avg(c2),_wstart, _wend,t3,t4,t5,t2 FROM meters WHERE ts >= '2018-09-17 8:00:00' AND ts < '2018-09-17 09:03:18.334' PARTITION BY t3,t4,t5,t2 INTERVAL(1d);" ctxs.append(TSMAQCBuilder().with_sql(sql) .should_query_with_table('meters', '2018-09-17 8:00:00', '2018-09-17 09:03:18.333').get_qc()) - #ctxs.extend(self.test_query_tsma_all()) + ctxs.extend(self.test_query_tsma_all()) return ctxs def test_query_with_tsma_interval_partition_by_col(self): @@ -1000,7 +1000,7 @@ class TDTestCase: def run(self): self.init_data() # time.sleep(999999) - self.test_ddl() + #self.test_ddl() self.test_query_with_tsma() # time.sleep(999999)