From 40f5ff71f8f3c9b79be8222b8cda11bc23b03ee0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 10 Jun 2022 18:32:34 +0800 Subject: [PATCH 01/18] add top/bot splitting --- include/libs/function/functionMgt.h | 4 +- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 86 +++++++++++++++++++++++-- source/libs/function/src/builtinsimpl.c | 4 ++ 4 files changed, 89 insertions(+), 6 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index c8e803c811..eeb36b04ce 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -124,7 +124,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function // distributed splitting functions - FUNCTION_TYPE_APERCENTILE_PARTIAL, + FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, FUNCTION_TYPE_APERCENTILE_MERGE, FUNCTION_TYPE_SPREAD_PARTIAL, FUNCTION_TYPE_SPREAD_MERGE, @@ -134,6 +134,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_HYPERLOGLOG_MERGE, FUNCTION_TYPE_ELAPSED_PARTIAL, FUNCTION_TYPE_ELAPSED_MERGE, + FUNCTION_TYPE_TOP_PARTIAL, + FUNCTION_TYPE_TOP_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index b623c77110..92952d6cbc 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -104,6 +104,7 @@ int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getTopBotInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a8e9cac65e..aad871f311 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -325,7 +325,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_ return TSDB_CODE_SUCCESS; } -static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { +static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (2 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -360,8 +360,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return TSDB_CODE_SUCCESS; } -static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTop(pFunc, pErrBuf, len); +static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + + if (isPartial) { + if (2 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // param1 + SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); + if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)pParamNode1; + if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (pValue->datum.i < 1 || pValue->datum.i > 100) { + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } + + pValue->notReserved = true; + + // set result type + pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (1 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY == para1Type) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // set result type + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTopBotImpl(pFunc, pErrBuf, len, true); +} + +static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTopBotImpl(pFunc, pErrBuf, len, false); } static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { @@ -1475,7 +1529,29 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "top", .type = FUNCTION_TYPE_TOP, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTop, + .translateFunc = translateTopBot, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, + .processFunc = topFunction, + .finalizeFunc = topBotFinalize, + .combineFunc = topCombine, + }, + { + .name = "_top_partial", + .type = FUNCTION_TYPE_TOP_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotPartial, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, + .processFunc = topFunction, + .finalizeFunc = topBotFinalize, + .combineFunc = topCombine, + }, + { + .name = "_top_merge", + .type = FUNCTION_TYPE_TOP_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotMerge, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = topFunction, @@ -1486,7 +1562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateBottom, + .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = bottomFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1eafd3c649..36c14f1be1 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2655,6 +2655,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { return numOfElems; } +int32_t getTopBotInfoSize() { + return (int32_t)sizeof(STopBotRes); +} + bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem); From 741d0c5c69fdb9323ef02cd74bbdca06908e06bf Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 10 Jun 2022 21:10:55 +0800 Subject: [PATCH 02/18] support merge function resType same as original input type --- source/libs/function/src/builtins.c | 6 +++--- source/libs/function/src/functionMgt.c | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index aad871f311..b1b98daa84 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -403,9 +403,9 @@ static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - // set result type - SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; - pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; + // Do nothing. We can only access output of partial functions as input, + // so original input type cannot be obtained, resType will be set same + // as original function input type after merge function created. } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index f2514f54f1..f4e216d55c 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -200,6 +200,27 @@ bool fmIsInvertible(int32_t funcId) { return res; } +//function has same input/output type +bool fmIsSameInOutType(int32_t funcId) { + bool res = false; + switch (funcMgtBuiltins[funcId].type) { + case FUNCTION_TYPE_MAX: + case FUNCTION_TYPE_MIN: + case FUNCTION_TYPE_TOP: + case FUNCTION_TYPE_BOTTOM: + case FUNCTION_TYPE_FIRST: + case FUNCTION_TYPE_LAST: + case FUNCTION_TYPE_SAMPLE: + case FUNCTION_TYPE_TAIL: + case FUNCTION_TYPE_UNIQUE: + res = true; + break; + default: + break; + } + return res; +} + static int32_t getFuncInfo(SFunctionNode* pFunc) { char msg[64] = {0}; if (NULL != gFunMgtService.pFuncNameHashTable) { @@ -274,6 +295,10 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio nodesDestroyList(pParameterList); return TSDB_CODE_OUT_OF_MEMORY; } + //overwrite function restype set by translate function + if (fmIsSameInOutType(funcMgtBuiltins[pSrcFunc->funcId].type)) { + (*pMergeFunc)->node.resType = pSrcFunc->node.resType; + } strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); return TSDB_CODE_SUCCESS; } From a94abd5dae6ee935b0142c9a007b1d59428230ed Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 10 Jun 2022 21:10:55 +0800 Subject: [PATCH 03/18] support merge function resType same as original input type --- source/libs/function/src/builtinsimpl.c | 30 ++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 36c14f1be1..c2e91c13dc 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2445,7 +2445,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (pSResInfo->numOfRes != 0 && + if (pSResInfo->numOfRes != 0 && (pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes)) ) { memcpy(pDBuf, pSBuf, bytes); *(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes); @@ -2699,6 +2699,34 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void topBotTransfer(STopBotRes* pInput, STopBotRes* pOutput) { +} + +int32_t topBotFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); + + pInfo->hasResult = pInputInfo->hasResult; + if (pInputInfo->max > pInfo->max) { + pInfo->max = pInputInfo->max; + } + + if (pInputInfo->min < pInfo->min) { + pInfo->min = pInputInfo->min; + } + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t bottomFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); From 33b455df850b3e039d6f01c900c73a9b9fdbd404 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 13:32:51 +0800 Subject: [PATCH 04/18] add top merge function --- source/libs/function/src/builtinsimpl.c | 38 ++++++++++++++----------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index ad68a1a8f7..b2a3624a55 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -66,6 +66,7 @@ typedef struct STopBotResItem { } STopBotResItem; typedef struct STopBotRes { + int32_t maxSize; STopBotResItem* pItems; } STopBotRes; @@ -2659,6 +2660,16 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } +bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; + } + + STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); + pRes->maxSize= pCtx->param[1].param.i; + return true; +} + static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); @@ -2670,6 +2681,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); +static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); + int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -2693,29 +2706,22 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topBotTransfer(STopBotRes* pInput, STopBotRes* pOutput) { +static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, int16_t type) { + for (int32_t i = 0; i < pInput->maxSize; i++) { + addResult(pCtx, &pInput->pItems[i], type, true); + } } -int32_t topBotFunctionMerge(SqlFunctionCtx *pCtx) { +int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t start = pInput->startRowIndex; char* data = colDataGetData(pCol, start); STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); - pInfo->hasResult = pInputInfo->hasResult; - if (pInputInfo->max > pInfo->max) { - pInfo->max = pInputInfo->max; - } - - if (pInputInfo->min < pInfo->min) { - pInfo->min = pInputInfo->min; - } - + topTransferInfo(pCtx, pInputInfo, pCol->info.type); SET_VAL(GET_RES_INFO(pCtx), 1, 1); return TSDB_CODE_SUCCESS; @@ -2774,7 +2780,6 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); - int32_t maxSize = pCtx->param[1].param.i; SVariant val = {0}; taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); @@ -2783,7 +2788,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData assert(pItems != NULL); // not full yet - if (pEntryInfo->numOfRes < maxSize) { + if (pEntryInfo->numOfRes < pRes->maxSize) { STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; pItem->v = val; pItem->uid = uid; @@ -2924,12 +2929,11 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); - int32_t maxSize = pCtx->param[1].param.i; STopBotResItem* pItems = pRes->pItems; assert(pItems != NULL); // not full yet - if (pEntryInfo->numOfRes < maxSize) { + if (pEntryInfo->numOfRes < pRes->maxSize) { STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; pItem->v = pSourceItem->v; pItem->uid = pSourceItem->uid; From ab9b76e1f9d3d205acc3c654b8d2357b35aee792 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 14:35:28 +0800 Subject: [PATCH 05/18] store original data type in STopBotRes for merge function --- source/libs/function/src/builtinsimpl.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b2a3624a55..98b9c82cb2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -67,6 +67,7 @@ typedef struct STopBotResItem { typedef struct STopBotRes { int32_t maxSize; + int16_t type; //store the original input type, used in merge function STopBotResItem* pItems; } STopBotRes; @@ -2666,7 +2667,11 @@ bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { } STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); - pRes->maxSize= pCtx->param[1].param.i; + SInputColumnInfoData* pInput = &pCtx->input; + + pRes->maxSize = pCtx->param[1].param.i; + pRes->type = pInput->pData[0]->info.type; + return true; } @@ -2690,7 +2695,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; - int32_t type = pInput->pData[0]->info.type; + int16_t type = pInput->pData[0]->info.type; int32_t start = pInput->startRowIndex; for (int32_t i = start; i < pInput->numOfRows + start; ++i) { @@ -2706,9 +2711,9 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, int16_t type) { +static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) { for (int32_t i = 0; i < pInput->maxSize; i++) { - addResult(pCtx, &pInput->pItems[i], type, true); + addResult(pCtx, &pInput->pItems[i], pInput->type, true); } } @@ -2721,7 +2726,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pCol, start); STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); - topTransferInfo(pCtx, pInputInfo, pCol->info.type); + topTransferInfo(pCtx, pInputInfo); SET_VAL(GET_RES_INFO(pCtx), 1, 1); return TSDB_CODE_SUCCESS; @@ -2902,7 +2907,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); - int32_t type = pCtx->input.pData[0]->info.type; + int16_t type = pCtx->input.pData[0]->info.type; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); From 4c29b82626bba91e4e8297acce4aac6d6395d6bc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 15:18:13 +0800 Subject: [PATCH 06/18] refactor --- source/libs/function/inc/builtinsimpl.h | 2 ++ source/libs/function/src/builtins.c | 4 ++-- source/libs/function/src/builtinsimpl.c | 30 ++++++++++++++++++++----- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index cca73617b2..0da0081cb0 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -100,8 +100,10 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); +int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4ea6b8cdfc..4631c15c21 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1549,7 +1549,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = topFunction, - .finalizeFunc = topBotFinalize, + .finalizeFunc = topBotPartialFinalize, .combineFunc = topCombine, }, { @@ -1559,7 +1559,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateTopBotMerge, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, - .processFunc = topFunction, + .processFunc = topFunctionMerge, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 98b9c82cb2..76a9b79605 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2727,7 +2727,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); topTransferInfo(pCtx, pInputInfo); - SET_VAL(GET_RES_INFO(pCtx), 1, 1); + SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); return TSDB_CODE_SUCCESS; } @@ -2930,6 +2930,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pEntryInfo->numOfRes; } +int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getTopBotInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -2990,15 +3008,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return TSDB_CODE_SUCCESS; } +int32_t getSpreadInfoSize() { + return (int32_t)sizeof(SSpreadInfo); +} + bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSpreadInfo); return true; } -int32_t getSpreadInfoSize() { - return (int32_t)sizeof(SSpreadInfo); -} - bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; @@ -3125,7 +3143,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = (int32_t)sizeof(SSpreadInfo); + int32_t resultBytes = getSpreadInfoSize(); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes); From 56705a995f2f82db4f48f1cc04b345efda32c576 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 15:29:26 +0800 Subject: [PATCH 07/18] enable top function split --- source/libs/function/src/builtins.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4631c15c21..74efe6a06c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1540,6 +1540,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = topFunction, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, + .pPartialFunc = "_top_partial", + .pMergeFunc = "_top_merge" }, { .name = "_top_partial", From e9a8217f97a8e75a2bf205a19a50715ec58b5691 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 15:44:55 +0800 Subject: [PATCH 08/18] fix getTopBotInfoSize issue --- source/libs/function/inc/builtinsimpl.h | 2 +- source/libs/function/src/builtins.c | 4 ++-- source/libs/function/src/builtinsimpl.c | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0da0081cb0..acf4a5e4af 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -106,7 +106,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -int32_t getTopBotInfoSize(); +int32_t getTopBotInfoSize(int64_t numOfItems); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 74efe6a06c..d1110d7da5 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -392,14 +392,14 @@ static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t pValue->notReserved = true; // set result type - pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; } else { if (1 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (TSDB_DATA_TYPE_BINARY == para1Type) { + if (TSDB_DATA_TYPE_BINARY != para1Type) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 76a9b79605..8662f89457 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2651,8 +2651,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { return numOfElems; } -int32_t getTopBotInfoSize() { - return (int32_t)sizeof(STopBotRes); +int32_t getTopBotInfoSize(int64_t numOfItems) { + return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); } bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -2933,7 +2933,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getTopBotInfoSize(); + int32_t resultBytes = getTopBotInfoSize(pRes->maxSize); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pRes, resultBytes); From 6473c85e13e33f6c1d5a4ae3f9597e6d186bf790 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 16:14:08 +0800 Subject: [PATCH 09/18] fix bugs --- source/libs/function/src/builtins.c | 4 ++-- source/libs/function/src/functionMgt.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d1110d7da5..eab3ec8c8e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1536,7 +1536,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, - .initFunc = functionSetup, + .initFunc = topBotFunctionSetup, .processFunc = topFunction, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, @@ -1549,7 +1549,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBotPartial, .getEnvFunc = getTopBotFuncEnv, - .initFunc = functionSetup, + .initFunc = topBotfunctionSetup, .processFunc = topFunction, .finalizeFunc = topBotPartialFinalize, .combineFunc = topCombine, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index f4e216d55c..c635542d74 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -296,7 +296,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio return TSDB_CODE_OUT_OF_MEMORY; } //overwrite function restype set by translate function - if (fmIsSameInOutType(funcMgtBuiltins[pSrcFunc->funcId].type)) { + if (fmIsSameInOutType(pSrcFunc->funcId)) { (*pMergeFunc)->node.resType = pSrcFunc->node.resType; } strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName); From 40f6fad49fd3fbbe1be88c08258ee3b76bceff50 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 16:21:45 +0800 Subject: [PATCH 10/18] fix bug --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index acf4a5e4af..c488abc198 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -99,6 +99,7 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); +bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index eab3ec8c8e..701f5a1a26 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1549,7 +1549,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBotPartial, .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotfunctionSetup, + .initFunc = topBotFunctionSetup, .processFunc = topFunction, .finalizeFunc = topBotPartialFinalize, .combineFunc = topCombine, From 15ba5769e0694839ed4160a50e51fc2082015c76 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 16:29:02 +0800 Subject: [PATCH 11/18] fix crash --- source/libs/function/src/builtinsimpl.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 8662f89457..b37b668aac 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2670,7 +2670,6 @@ bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { SInputColumnInfoData* pInput = &pCtx->input; pRes->maxSize = pCtx->param[1].param.i; - pRes->type = pInput->pData[0]->info.type; return true; } @@ -2695,7 +2694,8 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; - int16_t type = pInput->pData[0]->info.type; + STopBotRes* pRes = getTopBotOutputInfo(pCtx); + pRes->type = pInput->pData[0]->info.type; int32_t start = pInput->startRowIndex; for (int32_t i = start; i < pInput->numOfRows + start; ++i) { @@ -2705,7 +2705,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); } return TSDB_CODE_SUCCESS; From b9cc1e6d09197c1d2cb8df91000d37d40ba08977 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 16:39:26 +0800 Subject: [PATCH 12/18] add top getenv function for merge function --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c488abc198..0bc6076a88 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -99,6 +99,7 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); +bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunctionMerge(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 701f5a1a26..be879b08c0 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1559,7 +1559,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_TOP_MERGE, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotFuncEnv, + .getEnvFunc = getTopBotMergeFuncEnv, .initFunc = functionSetup, .processFunc = topFunctionMerge, .finalizeFunc = topBotFinalize, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b37b668aac..25d9284314 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2661,6 +2661,12 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } +bool getTopBotFuncMergeEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + //intermediate result is binary and length contains VAR header size + pEnv->calcMemSize = pFunc->node.resType.bytes - VARSTR_HEADER_SIZE; + return true; +} + bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { if (!functionSetup(pCtx, pResInfo)) { return false; From b53c65ea27f6a4c0086a1ee40d47795d6d74a930 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 19:22:57 +0800 Subject: [PATCH 13/18] disable selectivity function for top split for now --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 19 ++++++++++++++++--- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0bc6076a88..bea9a6a1b8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -106,6 +106,7 @@ int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(int64_t numOfItems); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index be879b08c0..1fbaa6b37e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1562,7 +1562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotMergeFuncEnv, .initFunc = functionSetup, .processFunc = topFunctionMerge, - .finalizeFunc = topBotFinalize, + .finalizeFunc = topBotMergeFinalize, .combineFunc = topCombine, }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 25d9284314..147407122a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2661,7 +2661,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool getTopBotFuncMergeEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { +bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { //intermediate result is binary and length contains VAR header size pEnv->calcMemSize = pFunc->node.resType.bytes - VARSTR_HEADER_SIZE; return true; @@ -2731,7 +2731,10 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; char* data = colDataGetData(pCol, start); STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); + STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->maxSize = pInputInfo->maxSize; + pInfo->type = pInputInfo->type; topTransferInfo(pCtx, pInputInfo); SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); @@ -2909,7 +2912,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS releaseBufPage(pCtx->pBuf, pPage); } -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { +int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -2929,13 +2932,23 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } - setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + if (!isMerge) { + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + } currentRow += 1; } return pEntryInfo->numOfRes; } +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + return topBotFinalizeImpl(pCtx, pBlock, false); +} + +int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + return topBotFinalizeImpl(pCtx, pBlock, true); +} + int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); From 8fe95c982cc9d767dc15c8aa4cf8dbafc63ab79a Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 12:39:25 +0800 Subject: [PATCH 14/18] fix bugs --- source/libs/function/src/builtinsimpl.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 4a593854d5..4c81f82269 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -68,6 +68,7 @@ typedef struct STopBotResItem { typedef struct STopBotRes { int32_t maxSize; int16_t type; //store the original input type, used in merge function + int32_t numOfItems; STopBotResItem* pItems; } STopBotRes; @@ -2718,12 +2719,13 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { } static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) { - for (int32_t i = 0; i < pInput->maxSize; i++) { + for (int32_t i = 0; i < pInput->numOfItems; i++) { addResult(pCtx, &pInput->pItems[i], pInput->type, true); } } int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); @@ -2736,7 +2738,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { pInfo->maxSize = pInputInfo->maxSize; pInfo->type = pInputInfo->type; topTransferInfo(pCtx, pInputInfo); - SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); + SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -2812,6 +2814,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; + // accumulate number of items for each vgroup, this info is needed for merge + pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -2983,6 +2987,8 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, pItem->tuplePos.pageId = -1; replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); pEntryInfo->numOfRes++; + // accumulate number of items for each vgroup, this info is needed for merge + pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result From 4036d9714ea60afa8b80d3811f62dcd651000658 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 13:38:21 +0800 Subject: [PATCH 15/18] add bottom function distribution splitting --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 26 +++++++++++- source/libs/function/src/builtinsimpl.c | 55 +++++++++++++++++-------- 4 files changed, 66 insertions(+), 18 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 41f5990c95..e86d643c0d 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -136,6 +136,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_ELAPSED_MERGE, FUNCTION_TYPE_TOP_PARTIAL, FUNCTION_TYPE_TOP_MERGE, + FUNCTION_TYPE_BOTTOM_PARTIAL, + FUNCTION_TYPE_BOTTOM_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index bea9a6a1b8..710b1255f7 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -104,6 +104,7 @@ bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0dd809524f..e71dcba500 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1574,10 +1574,34 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, - .initFunc = functionSetup, + .initFunc = topBotFunctionSetup, .processFunc = bottomFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, + .pPartialFunc = "_bottom_partial", + .pMergeFunc = "_bottom_merge" + }, + { + .name = "_bottom_partial", + .type = FUNCTION_TYPE_BOTTOM_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotPartial, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = topBotFunctionSetup, + .processFunc = bottomFunction, + .finalizeFunc = topBotPartialFinalize, + .combineFunc = bottomCombine, + }, + { + .name = "_bottom_merge", + .type = FUNCTION_TYPE_BOTTOM_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotMerge, + .getEnvFunc = getTopBotMergeFuncEnv, + .initFunc = functionSetup, + .processFunc = bottomFunctionMerge, + .finalizeFunc = topBotMergeFinalize, + .combineFunc = bottomCombine, }, { .name = "spread", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 30821f04cc..580df07cf8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2716,9 +2716,33 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) { +int32_t bottomFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElems = 0; + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + STopBotRes* pRes = getTopBotOutputInfo(pCtx); + pRes->type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElems++; + char* data = colDataGetData(pCol, i); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + } + + return TSDB_CODE_SUCCESS; +} + +static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) { for (int32_t i = 0; i < pInput->numOfItems; i++) { - addResult(pCtx, &pInput->pItems[i], pInput->type, true); + addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery); } } @@ -2735,31 +2759,28 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { pInfo->maxSize = pInputInfo->maxSize; pInfo->type = pInputInfo->type; - topTransferInfo(pCtx, pInputInfo); + topBotTransferInfo(pCtx, pInputInfo, true); SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } -int32_t bottomFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = 0; - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - - int32_t type = pInput->pData[0]->info.type; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); int32_t start = pInput->startRowIndex; - for (int32_t i = start; i < pInput->numOfRows + start; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; - } + char* data = colDataGetData(pCol, start); + STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); + STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - numOfElems++; - char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); - } + pInfo->maxSize = pInputInfo->maxSize; + pInfo->type = pInputInfo->type; + topBotTransferInfo(pCtx, pInputInfo, false); + SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } From f7c20f6c0de0b80aaf88c57118d1302b729a31fc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 16:14:16 +0800 Subject: [PATCH 16/18] fix invalid read-write caused by getFuncEnv --- source/libs/function/src/builtinsimpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 580df07cf8..2a1974ede9 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2662,7 +2662,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { //intermediate result is binary and length contains VAR header size - pEnv->calcMemSize = pFunc->node.resType.bytes - VARSTR_HEADER_SIZE; + pEnv->calcMemSize = pFunc->node.resType.bytes; return true; } From dc3169b92d78ca1c1cf77f1dc6c37facb1ef1026 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 16:15:37 +0800 Subject: [PATCH 17/18] fix case format --- tests/system-test/2-query/top.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/system-test/2-query/top.py b/tests/system-test/2-query/top.py index fbbbb2c99a..146bb34937 100644 --- a/tests/system-test/2-query/top.py +++ b/tests/system-test/2-query/top.py @@ -23,11 +23,11 @@ class TDTestCase: self.rowNum = 10 self.ts = 1537146000000 - + def run(self): tdSql.prepare() - + tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''') @@ -35,9 +35,9 @@ class TDTestCase: for i in range(self.rowNum): tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - - # top verifacation + + # top verifacation tdSql.error("select top(ts, 10) from test") tdSql.error("select top(col1, 0) from test") tdSql.error("select top(col1, 101) from test") @@ -50,8 +50,8 @@ class TDTestCase: tdSql.error("select top(col5, 0) from test") tdSql.error("select top(col5, 101) from test") tdSql.error("select top(col6, 0) from test") - tdSql.error("select top(col6, 101) from test") - tdSql.error("select top(col7, 10) from test") + tdSql.error("select top(col6, 101) from test") + tdSql.error("select top(col7, 10) from test") tdSql.error("select top(col8, 10) from test") tdSql.error("select top(col9, 10) from test") tdSql.error("select top(col11, 0) from test") @@ -95,7 +95,7 @@ class TDTestCase: tdSql.checkRows(2) tdSql.query('select top(col2,1) from test interval(1y) order by col2') tdSql.checkData(0,0,10) - + tdSql.error("select * from test where bottom(col2,1)=1") tdSql.error("select top(col14, 0) from test;") def stop(self): From 8f2cb921115c5736a996f3de4aa581f9d843dafe Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 16:15:37 +0800 Subject: [PATCH 18/18] fix case format --- tests/system-test/2-query/bottom.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/system-test/2-query/bottom.py b/tests/system-test/2-query/bottom.py index a4390372df..008f59aa6a 100644 --- a/tests/system-test/2-query/bottom.py +++ b/tests/system-test/2-query/bottom.py @@ -24,18 +24,18 @@ class TDTestCase: self.rowNum = 10 self.ts = 1537146000000 - - def run(self): - tdSql.prepare() - tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, + def run(self): + tdSql.prepare() + + tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''') tdSql.execute("create table test1 using test tags('beijing')") for i in range(self.rowNum): - tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" - % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) + tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" + % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - # bottom verifacation + # bottom verifacation tdSql.error("select bottom(ts, 10) from test") tdSql.error("select bottom(col1, 0) from test") tdSql.error("select bottom(col1, 101) from test") @@ -48,8 +48,8 @@ class TDTestCase: tdSql.error("select bottom(col5, 0) from test") tdSql.error("select bottom(col5, 101) from test") tdSql.error("select bottom(col6, 0) from test") - tdSql.error("select bottom(col6, 101) from test") - tdSql.error("select bottom(col7, 10) from test") + tdSql.error("select bottom(col6, 101) from test") + tdSql.error("select bottom(col7, 10) from test") tdSql.error("select bottom(col8, 10) from test") tdSql.error("select bottom(col9, 10) from test") @@ -90,7 +90,7 @@ class TDTestCase: tdSql.checkRows(2) tdSql.query("select ts,bottom(col1, 2),ts from test group by tbname") tdSql.checkRows(2) - + tdSql.query('select bottom(col2,1) from test interval(1y) order by col2') tdSql.checkData(0,0,1)