From d49f1dbc152997d73d2201c5a3a33002cf8b9e35 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 17:38:51 +0800 Subject: [PATCH 1/6] enh(query): add histogram function distributed splitting TD-16321 --- include/libs/function/functionMgt.h | 2 ++ source/libs/function/src/builtins.c | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e6512b5bac..77376a05d9 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -127,6 +127,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_APERCENTILE_MERGE, FUNCTION_TYPE_SPREAD_PARTIAL, FUNCTION_TYPE_SPREAD_MERGE, + FUNCTION_TYPE_HISTOGRAM_PARTIAL, + FUNCTION_TYPE_HISTOGRAM_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 946c37bfd2..49dbd2ead9 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1393,6 +1393,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = histogramFunction, .finalizeFunc = histogramFinalize }, + { + .name = "_histogram_partial", + .type = FUNCTION_TYPE_HISTOGRAM_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateHistogram, + .getEnvFunc = getHistogramFuncEnv, + .initFunc = histogramFunctionSetup, + .processFunc = histogramFunction, + .finalizeFunc = histogramFinalize + }, + { + .name = "_histogram_merge", + .type = FUNCTION_TYPE_HISTOGRAM_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateHistogram, + .getEnvFunc = getHistogramFuncEnv, + .initFunc = histogramFunctionSetup, + .processFunc = histogramFunction, + .finalizeFunc = histogramFinalize + }, { .name = "hyperloglog", .type = FUNCTION_TYPE_HYPERLOGLOG, From 081a8a62b2a8453d7ef5adc89dd54cead7bbac1f Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 17:59:32 +0800 Subject: [PATCH 2/6] add histogram partial/merge translate functions --- source/libs/function/inc/builtinsimpl.h | 5 ++- source/libs/function/src/builtins.c | 56 ++++++++++++++++++++++++- source/libs/function/src/builtinsimpl.c | 4 ++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 981b1eec88..c41949bf20 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -78,13 +78,13 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI int32_t percentileFunction(SqlFunctionCtx *pCtx); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t getApercentileMaxSize(); bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t apercentileFunction(SqlFunctionCtx *pCtx); int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getApercentileMaxSize(); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); @@ -102,13 +102,13 @@ int32_t topFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t getSpreadInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getSpreadInfoSize(); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); @@ -119,6 +119,7 @@ bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getHistogramInfoSize(); bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t hllFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 49dbd2ead9..bd13cdcb47 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -503,6 +503,58 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (isPartial) { + if (4 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // param1 ~ param3 + for (int32_t i = 1; i < numOfParams; ++i) { + SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, i); + if (QUERY_NODE_VALUE != nodeType(pParamNode)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)pParamNode; + + pValue->notReserved = true; + } + + if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY || + ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY || + ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (1 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type != TSDB_DATA_TYPE_BINARY) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_BINARY}; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateHistogramPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateHistogramImpl(pFunc, pErrBuf, len, true); +} +static int32_t translateHistogramMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateHistogramImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1397,7 +1449,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_histogram_partial", .type = FUNCTION_TYPE_HISTOGRAM_PARTIAL, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateHistogram, + .translateFunc = translateHistogramPartial, .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, .processFunc = histogramFunction, @@ -1407,7 +1459,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_histogram_merge", .type = FUNCTION_TYPE_HISTOGRAM_MERGE, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateHistogram, + .translateFunc = translateHistogramMerge, .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, .processFunc = histogramFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 34adf11032..fc72a05edb 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3118,6 +3118,10 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t getHistogramInfoSize() { + return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); +} + bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); return true; From c07bea7ff5ab4083c78c8359b20b2eb4a33ea7bb Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 18:40:16 +0800 Subject: [PATCH 3/6] add histogram partial/merge function --- source/libs/function/src/builtinsimpl.c | 45 +++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fc72a05edb..41414efba5 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2187,9 +2187,7 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); - int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); - int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION)); - int32_t resultBytes = TMAX(bytesHist, bytesDigest); + int32_t resultBytes = getApercentileMaxSize(); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); if (pInfo->algo == APERCT_ALGO_TDIGEST) { @@ -3332,6 +3330,29 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } +int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data); + + pInfo->normalized = pInputInfo->normalized; + pInfo->numOfBins = pInputInfo->numOfBins; + pInfo->totalCount += pInputInfo->totalCount; + for (int32_t k = 0; k < pInfo->numOfBins; ++k) { + pInfo->bins[k].count += pInputInfo->bins[k].count; + } + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); @@ -3368,6 +3389,24 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getHistogramInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SHLLInfo); return true; From fc1ad1aeac47656a10fcdf2e3f4ad380eae00ae2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 18:40:16 +0800 Subject: [PATCH 4/6] add histogram partial/merge function --- source/libs/function/inc/builtinsimpl.h | 2 ++ source/libs/function/src/builtins.c | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c41949bf20..3c272d3933 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -118,7 +118,9 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t histogramFunction(SqlFunctionCtx* pCtx); +int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getHistogramInfoSize(); bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index bd13cdcb47..b4c2bb980e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1453,7 +1453,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, .processFunc = histogramFunction, - .finalizeFunc = histogramFinalize + .finalizeFunc = histogramPartialFinalize }, { .name = "_histogram_merge", @@ -1462,7 +1462,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateHistogramMerge, .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, - .processFunc = histogramFunction, + .processFunc = histogramFunctionMerge, .finalizeFunc = histogramFinalize }, { From 9ccc3fb85354fd6bc81be310caa0d8dec0831572 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 19:12:57 +0800 Subject: [PATCH 5/6] fix: enable histogram splitting --- source/libs/function/src/builtins.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b4c2bb980e..f6414a6e20 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1443,7 +1443,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, .processFunc = histogramFunction, - .finalizeFunc = histogramFinalize + .finalizeFunc = histogramFinalize, + .pPartialFunc = "_histogram_partial", + .pMergeFunc = "_histogram_merge" }, { .name = "_histogram_partial", From ea05f8af12142b547b9afe4b45e52593aadc5869 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 20:30:43 +0800 Subject: [PATCH 6/6] fix bugs --- source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index f6414a6e20..e900d2d49a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1463,7 +1463,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateHistogramMerge, .getEnvFunc = getHistogramFuncEnv, - .initFunc = histogramFunctionSetup, + .initFunc = functionSetup, .processFunc = histogramFunctionMerge, .finalizeFunc = histogramFinalize }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 41414efba5..c279ceb01e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3345,11 +3345,12 @@ int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) { pInfo->numOfBins = pInputInfo->numOfBins; pInfo->totalCount += pInputInfo->totalCount; for (int32_t k = 0; k < pInfo->numOfBins; ++k) { + pInfo->bins[k].lower = pInputInfo->bins[k].lower; + pInfo->bins[k].upper = pInputInfo->bins[k].upper; pInfo->bins[k].count += pInputInfo->bins[k].count; } - SET_VAL(GET_RES_INFO(pCtx), 1, 1); - + SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins); return TSDB_CODE_SUCCESS; } @@ -3404,7 +3405,7 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { colDataAppend(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return 1; } bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {