From ad895c243da6c50f6e8a8790c19a2f5d6638e3f3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 14:26:45 +0800 Subject: [PATCH 1/6] enh(query): add spread function distributed splitting --- include/libs/function/functionMgt.h | 2 ++ source/libs/function/src/builtins.c | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 9d0995a8d8..e6512b5bac 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -125,6 +125,8 @@ typedef enum EFunctionType { // distributed splitting functions FUNCTION_TYPE_APERCENTILE_PARTIAL, FUNCTION_TYPE_APERCENTILE_MERGE, + FUNCTION_TYPE_SPREAD_PARTIAL, + FUNCTION_TYPE_SPREAD_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 88259e2cbd..7897cc10c2 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1279,6 +1279,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = spreadFunction, .finalizeFunc = spreadFinalize }, + { + .name = "_spread_partial", + .type = FUNCTION_TYPE_SPREAD_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateSpread, + .dataRequiredFunc = statisDataRequired, + .getEnvFunc = getSpreadFuncEnv, + .initFunc = spreadFunctionSetup, + .processFunc = spreadFunction, + .finalizeFunc = spreadFinalize + }, + { + .name = "_spread_merge", + .type = FUNCTION_TYPE_SPREAD_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateSpread, + .dataRequiredFunc = statisDataRequired, + .getEnvFunc = getSpreadFuncEnv, + .initFunc = spreadFunctionSetup, + .processFunc = spreadFunction, + .finalizeFunc = spreadFinalize + }, { .name = "elapsed", .type = FUNCTION_TYPE_ELAPSED, From 183fcac01360783359b464fb6625fdff615cde0d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 15:05:37 +0800 Subject: [PATCH 2/6] add spread splitting translate fucntions --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 33 ++++++++++++++++++++++--- source/libs/function/src/builtinsimpl.c | 4 +++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 2febf86e60..c049eedb21 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -102,6 +102,7 @@ int32_t topFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getSpreadInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 7897cc10c2..5154a7bd77 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -317,7 +317,6 @@ static int32_t translateApercentileMerge(SFunctionNode* pFunc, char* pErrBuf, in return translateApercentileImpl(pFunc, pErrBuf, len, false); } - static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = @@ -378,6 +377,34 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateSpreadImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (isPartial) { + if (!IS_NUMERIC_TYPE(paraType) && TSDB_DATA_TYPE_TIMESTAMP != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = (SDataType){.bytes = getSpreadInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t translateSpreadPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateSpreadImpl(pFunc, pErrBuf, len, true); +} +static int32_t translateSpreadMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateSpreadImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (1 != numOfParams && 2 != numOfParams) { @@ -1283,7 +1310,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_spread_partial", .type = FUNCTION_TYPE_SPREAD_PARTIAL, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateSpread, + .translateFunc = translateSpreadPartial, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, @@ -1294,7 +1321,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_spread_merge", .type = FUNCTION_TYPE_SPREAD_MERGE, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateSpread, + .translateFunc = translateSpreadMerge, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e1a43e70f7..da271e28fd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2868,6 +2868,10 @@ bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } +int32_t getSpreadInfoSize() { + return (int32_t)sizeof(SSpreadInfo); +} + bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; From 622b4f8d47ca187c54dbbc197c49579669548755 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 15:35:48 +0800 Subject: [PATCH 3/6] add spread split partial/merge function --- source/libs/function/src/builtinsimpl.c | 57 ++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index da271e28fd..acf2b348b4 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2190,19 +2190,19 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION)); int32_t resultBytes = TMAX(bytesHist, bytesDigest); - char *tmp = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); if (pInfo->algo == APERCT_ALGO_TDIGEST) { if (pInfo->pTDigest->size > 0) { - memcpy(varDataVal(tmp), pInfo, resultBytes); - varDataSetLen(tmp, resultBytes); + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); } else { return TSDB_CODE_SUCCESS; } } else { if (pInfo->pHisto->numOfElems > 0) { - memcpy(varDataVal(tmp), pInfo, resultBytes); - varDataSetLen(tmp, resultBytes); + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); } else { return TSDB_CODE_SUCCESS; } @@ -2211,9 +2211,9 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataAppend(pCol, pBlock->info.rows, tmp, false); + colDataAppend(pCol, pBlock->info.rows, res, false); - taosMemoryFree(tmp); + taosMemoryFree(res); return pResInfo->numOfRes; } @@ -2957,6 +2957,31 @@ _spread_over: return TSDB_CODE_SUCCESS; } +int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SSpreadInfo* pInputInfo; + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + pInputInfo = (SSpreadInfo *)varDataVal(data); + + if (pInputInfo->max > pInfo->max) { + pInfo->max =pInputInfo->max; + } + + if (pInputInfo->min < pInfo->min) { + pInfo->min =pInputInfo->min; + } + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pInfo->hasResult == true) { @@ -2965,6 +2990,24 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = (int32_t)sizeof(SSpreadInfo); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SElapsedInfo); return true; From dad3dd4f9758b615230e44ab429b059bb02591d2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 15:45:32 +0800 Subject: [PATCH 4/6] fix bugs --- source/libs/function/inc/builtinsimpl.h | 2 ++ source/libs/function/src/builtins.c | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c049eedb21..981b1eec88 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -106,7 +106,9 @@ int32_t getSpreadInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); +int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5154a7bd77..de11209b29 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1315,7 +1315,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, .processFunc = spreadFunction, - .finalizeFunc = spreadFinalize + .finalizeFunc = spreadPartialFinalize }, { .name = "_spread_merge", @@ -1325,7 +1325,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .dataRequiredFunc = statisDataRequired, .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, - .processFunc = spreadFunction, + .processFunc = spreadFunctionMerge, .finalizeFunc = spreadFinalize }, { From 991365802ba842ee1cfb78f291a7e6765334b6e0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 15:54:38 +0800 Subject: [PATCH 5/6] enable spread splitting --- source/libs/function/src/builtins.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index de11209b29..946c37bfd2 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1304,7 +1304,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, .processFunc = spreadFunction, - .finalizeFunc = spreadFinalize + .finalizeFunc = spreadFinalize, + .pPartialFunc = "_spread_partial", + .pMergeFunc = "_spread_merge" }, { .name = "_spread_partial", From e4216bbc87d3f4646b8e106738dc4752746d2e71 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 16:29:34 +0800 Subject: [PATCH 6/6] fix bugs --- source/libs/function/src/builtinsimpl.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index acf2b348b4..34adf11032 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2969,12 +2969,13 @@ int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pCol, start); pInputInfo = (SSpreadInfo *)varDataVal(data); + pInfo->hasResult = pInputInfo->hasResult; if (pInputInfo->max > pInfo->max) { - pInfo->max =pInputInfo->max; + pInfo->max = pInputInfo->max; } if (pInputInfo->min < pInfo->min) { - pInfo->min =pInputInfo->min; + pInfo->min = pInputInfo->min; } SET_VAL(GET_RES_INFO(pCtx), 1, 1);