From 4c29b82626bba91e4e8297acce4aac6d6395d6bc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 11 Jun 2022 15:18:13 +0800 Subject: [PATCH] refactor --- source/libs/function/inc/builtinsimpl.h | 2 ++ source/libs/function/src/builtins.c | 4 ++-- source/libs/function/src/builtinsimpl.c | 30 ++++++++++++++++++++----- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index cca73617b2..0da0081cb0 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -100,8 +100,10 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); +int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4ea6b8cdfc..4631c15c21 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1549,7 +1549,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = topFunction, - .finalizeFunc = topBotFinalize, + .finalizeFunc = topBotPartialFinalize, .combineFunc = topCombine, }, { @@ -1559,7 +1559,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateTopBotMerge, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, - .processFunc = topFunction, + .processFunc = topFunctionMerge, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 98b9c82cb2..76a9b79605 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2727,7 +2727,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); topTransferInfo(pCtx, pInputInfo); - SET_VAL(GET_RES_INFO(pCtx), 1, 1); + SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); return TSDB_CODE_SUCCESS; } @@ -2930,6 +2930,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pEntryInfo->numOfRes; } +int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getTopBotInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -2990,15 +3008,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return TSDB_CODE_SUCCESS; } +int32_t getSpreadInfoSize() { + return (int32_t)sizeof(SSpreadInfo); +} + bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSpreadInfo); return true; } -int32_t getSpreadInfoSize() { - return (int32_t)sizeof(SSpreadInfo); -} - bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; @@ -3125,7 +3143,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = (int32_t)sizeof(SSpreadInfo); + int32_t resultBytes = getSpreadInfoSize(); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes);