From cd94eabc73440242b7d81ae4f5ab239b7409c146 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 16:42:05 +0800 Subject: [PATCH 1/3] add stddev function translate partial/merge --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 53 +++++++++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 2 + 4 files changed, 58 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f324d8fed1..a73b9b47f4 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -144,6 +144,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_LAST_MERGE, FUNCTION_TYPE_AVG_PARTIAL, FUNCTION_TYPE_AVG_MERGE, + FUNCTION_TYPE_STDDEV_PARTIAL, + FUNCTION_TYPE_STDDEV_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 1bb30934c6..0d08f97c77 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -68,6 +68,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getStddevInfoSize(); bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4dae296f05..0099f9e8c4 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -184,6 +184,35 @@ static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t le return TSDB_CODE_SUCCESS; } +static int32_t translateStddevPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getStddevInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateStddevMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + + return TSDB_CODE_SUCCESS; +} + static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; @@ -1523,6 +1552,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .invertFunc = stddevInvertFunction, .combineFunc = stddevCombine, }, + { + .name = "_stddev_partial", + .type = FUNCTION_TYPE_STDDEV_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateStddevPartial, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunction, + .finalizeFunc = stddevFinalize, + .invertFunc = stddevInvertFunction, + .combineFunc = stddevCombine, + }, + { + .name = "_stddev_merge", + .type = FUNCTION_TYPE_STDDEV_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateStddevMerge, + .getEnvFunc = getStddevFuncEnv, + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunction, + .finalizeFunc = stddevFinalize, + .invertFunc = stddevInvertFunction, + .combineFunc = stddevCombine, + }, { .name = "leastsquares", .type = FUNCTION_TYPE_LEASTSQUARES, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5ee9dbdee6..648a60c299 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1488,6 +1488,8 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return minMaxCombine(pDestCtx, pSourceCtx, 0); } +int32_t getStddevInfoSize() { return (int32_t)sizeof(SStddevRes); } + bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStddevRes); return true; From 68d33b1656abd0f3267f791634da99ba31298e96 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 16:53:59 +0800 Subject: [PATCH 2/3] add stddev function partial/merge --- source/libs/function/src/builtinsimpl.c | 56 ++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 648a60c299..c6ab9f859d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -89,6 +89,7 @@ typedef struct SStddevRes { double dsum; int64_t isum; }; + int16_t type; } SStddevRes; typedef struct SLeastSQRInfo { @@ -1513,6 +1514,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pStddevRes->type = type; // computing based on the true data block SColumnInfoData* pCol = pInput->pData[0]; @@ -1629,6 +1631,39 @@ _stddev_over: return TSDB_CODE_SUCCESS; } +static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) { + pOutput->type = pInput->type; + if (IS_INTEGER_TYPE(pOutput->type)) { + pOutput->quadraticISum += pInput->quadraticISum; + pOutput->isum += pInput->isum; + } else { + pOutput->quadraticDSum += pInput->quadraticDSum; + pOutput->dsum += pInput->dsum; + } + + pOutput->count += pInput->count; + + return; +} + +int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); + + stddevTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + #define LIST_STDDEV_SUB_N(sumT, T) \ do { \ T* plist = (T*)pCol->pData; \ @@ -1694,9 +1729,10 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) { int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pStddevRes->type; double avg; + if (IS_INTEGER_TYPE(type)) { avg = pStddevRes->isum / ((double)pStddevRes->count); pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); @@ -1708,6 +1744,24 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getStddevInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); From 7ed7fd0fd681689f86515ed7f4f34f8d1ce61fcf Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 17:00:32 +0800 Subject: [PATCH 3/3] enable stddev splitting --- source/libs/function/inc/builtinsimpl.h | 2 ++ source/libs/function/src/builtins.c | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0d08f97c77..f3060243ed 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -65,7 +65,9 @@ int32_t getAvgInfoSize(); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); +int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getStddevInfoSize(); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0099f9e8c4..4ffdf44cd6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1551,6 +1551,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = stddevFinalize, .invertFunc = stddevInvertFunction, .combineFunc = stddevCombine, + .pPartialFunc = "_stddev_partial", + .pMergeFunc = "_stddev_merge" }, { .name = "_stddev_partial", @@ -1560,7 +1562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getStddevFuncEnv, .initFunc = stddevFunctionSetup, .processFunc = stddevFunction, - .finalizeFunc = stddevFinalize, + .finalizeFunc = stddevPartialFinalize, .invertFunc = stddevInvertFunction, .combineFunc = stddevCombine, }, @@ -1571,7 +1573,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateStddevMerge, .getEnvFunc = getStddevFuncEnv, .initFunc = stddevFunctionSetup, - .processFunc = stddevFunction, + .processFunc = stddevFunctionMerge, .finalizeFunc = stddevFinalize, .invertFunc = stddevInvertFunction, .combineFunc = stddevCombine,