From 6d952626882dbdc66a8c4cb1572b1ca69bf9330d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 19:11:28 +0800 Subject: [PATCH 1/9] enh(query): add first/last function distributed implementation --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 57 +++++++++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 4 ++ 4 files changed, 64 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e86d643c0d..f01e734fde 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -138,6 +138,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_TOP_MERGE, FUNCTION_TYPE_BOTTOM_PARTIAL, FUNCTION_TYPE_BOTTOM_MERGE, + FUNCTION_TYPE_FIRST_PARTIAL, + FUNCTION_TYPE_FIRST_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index a528041964..f3bcad8ede 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -97,6 +97,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getFirstLastInfoSize(int32_t resBytes); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 23940a415e..a2849d601e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -954,6 +954,41 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + // first(col_list) will be rewritten as first(col) + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + uint8_t paraType = ((SExprNode*)pPara)->resType.type; + int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes; + if (isPartial) { + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The parameters of first/last can only be columns"); + } + + pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, + .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = ((SExprNode*)pPara)->resType; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateFirstLastPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateFirstLastImpl(pFunc, pErrBuf, len, true); +} + +static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateFirstLastImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1706,6 +1741,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = firstLastFinalize, .combineFunc = firstCombine, }, + { + .name = "_first_partial", + .type = FUNCTION_TYPE_FIRST_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastPartial, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = firstLastFinalize, + .combineFunc = firstCombine, + }, + { + .name = "_first_merge", + .type = FUNCTION_TYPE_FIRST_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = firstLastFinalize, + .combineFunc = firstCombine, + }, { .name = "last", .type = FUNCTION_TYPE_LAST, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3d64f82eba..44418685b0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2244,6 +2244,10 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } +int32_t getFirstLastInfoSize(int32_t resBytes) { + return resBytes + sizeof(int64_t); +} + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); From 45ef3db78abd908abbc57a7bfbc56871381413f2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 20:53:42 +0800 Subject: [PATCH 2/9] add first function partial/merge --- source/libs/function/inc/builtinsimpl.h | 2 + source/libs/function/src/builtins.c | 4 +- source/libs/function/src/builtinsimpl.c | 82 +++++++++++++++++++++---- 3 files changed, 75 insertions(+), 13 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index f3bcad8ede..af9b8d4a10 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -93,8 +93,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); +int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getFirstLastInfoSize(int32_t resBytes); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a2849d601e..180ffd1848 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1749,7 +1749,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = firstFunction, - .finalizeFunc = firstLastFinalize, + .finalizeFunc = firstLastPartialFinalize, .combineFunc = firstCombine, }, { @@ -1759,7 +1759,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLastMerge, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = firstFunction, + .processFunc = firstFunctionMerge, .finalizeFunc = firstLastFinalize, .combineFunc = firstCombine, }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 44418685b0..1bb27780bc 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -72,6 +72,12 @@ typedef struct STopBotRes { STopBotResItem* pItems; } STopBotRes; +typedef struct SFirstLastRes { + bool hasResult; + int32_t bytes; + char buf[]; +} SFirstLastRes; + typedef struct SStddevRes { double result; int64_t count; @@ -2245,12 +2251,12 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) } int32_t getFirstLastInfoSize(int32_t resBytes) { - return resBytes + sizeof(int64_t); + return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t); } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); + pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t); return true; } @@ -2274,12 +2280,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2297,7 +2304,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (blockDataOrder == TSDB_ORDER_ASC) { // filter according to current result firstly if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(buf + bytes); + TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (ts < startKey) { return TSDB_CODE_SUCCESS; } @@ -2313,9 +2320,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; @@ -2326,7 +2334,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { // in case of descending order time stamp serial, which usually happens as the results of the nest query, // all data needs to be check. if (pResInfo->numOfRes > 0) { - TSKEY ts = *(TSKEY*)(buf + bytes); + TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (ts < endKey) { return TSDB_CODE_SUCCESS; } @@ -2342,9 +2350,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; break; @@ -2422,6 +2431,39 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput) { + pOutput->bytes = pInput->bytes; + TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes); + TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); + if (pOutput->hasResult) { + if (*tsIn > *tsOut) { + return; + } + } + *tsOut = *tsIn; + memcpy(pOutput->buf, pInput->buf, pOutput->bytes); + pOutput->hasResult = true; + return; +} + +int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data); + + firstLastTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); @@ -2435,6 +2477,24 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); From 4240a38c1783804d4ee7aa0404fca0e305e8ccdc Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 20:56:33 +0800 Subject: [PATCH 3/9] fix bugs --- source/libs/function/src/builtinsimpl.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1bb27780bc..280f0217b9 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2471,8 +2471,8 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; - char* in = GET_ROWCELL_INTERBUF(pResInfo); - colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); + SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); + colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes); return pResInfo->numOfRes; } From 763e8bd6aef99264dc6608df5881738be11d9790 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 21:17:55 +0800 Subject: [PATCH 4/9] enable first function split --- source/libs/function/src/builtins.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 180ffd1848..3bfb60ec7c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -956,7 +956,7 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { // first(col_list) will be rewritten as first(col) - if (1 != LIST_LENGTH(pFunc->pParameterList)) { + if (2 != LIST_LENGTH(pFunc->pParameterList)) { //input has two params c0,ts, is this a bug? return TSDB_CODE_SUCCESS; } @@ -1739,6 +1739,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = firstFunction, .finalizeFunc = firstLastFinalize, + .pPartialFunc = "_first_partial", + .pMergeFunc = "_first_merge", .combineFunc = firstCombine, }, { From d4a38245a20d6296716200b253a7b8ec5a51abf3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 21:21:46 +0800 Subject: [PATCH 5/9] add some check --- source/libs/function/src/builtinsimpl.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 280f0217b9..032333228a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2432,6 +2432,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput) { + if (!pInput->hasResult) { + return; + } pOutput->bytes = pInput->bytes; TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes); TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); From 589b30422f180cc990a371cc4d0fdd36843d26d4 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 10:51:00 +0800 Subject: [PATCH 6/9] add last function splitting --- include/libs/function/functionMgt.h | 2 ++ source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 22 ++++++++++++++++++++++ 3 files changed, 25 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e787639897..b5641f2d07 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -140,6 +140,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_BOTTOM_MERGE, FUNCTION_TYPE_FIRST_PARTIAL, FUNCTION_TYPE_FIRST_MERGE, + FUNCTION_TYPE_LAST_PARTIAL, + FUNCTION_TYPE_LAST_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index af9b8d4a10..7db1396603 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -95,6 +95,7 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); +int32_t lastFunctionMerge(SqlFunctionCtx *pCtx); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0a6d99c52d..7a06304487 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1776,6 +1776,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = firstLastFinalize, .combineFunc = lastCombine, }, + { + .name = "_last_partial", + .type = FUNCTION_TYPE_LAST_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastPartial, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = firstLastPartialFinalize, + .combineFunc = lastCombine, + }, + { + .name = "_last_merge", + .type = FUNCTION_TYPE_LAST_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunctionMerge, + .finalizeFunc = firstLastFinalize, + .combineFunc = lastCombine, + }, { .name = "twa", .type = FUNCTION_TYPE_TWA, From 6a4fc4577126119b4ffc3d2f0285862a8b656647 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 11:00:22 +0800 Subject: [PATCH 7/9] fix bugs --- source/libs/function/src/builtinsimpl.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index f2c5df8fc2..fe52e9c7c5 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2355,12 +2355,13 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* buf = GET_ROWCELL_INTERBUF(pResInfo); + SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; // All null data column, return directly. if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { @@ -2385,9 +2386,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); pResInfo->numOfRes = 1; } @@ -2403,9 +2404,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { - memcpy(buf, data, bytes); - *(TSKEY*)(buf + bytes) = cts; + if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + memcpy(pInfo->buf, data, bytes); + *(TSKEY*)(pInfo->buf + bytes) = cts; pResInfo->numOfRes = 1; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); } From 6f27ee975e758cf4af935b0d554c740412f9c2b7 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 11:04:36 +0800 Subject: [PATCH 8/9] enable last splitting --- source/libs/function/src/builtins.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 7a06304487..1c3f274ee9 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1774,6 +1774,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = lastFunction, .finalizeFunc = firstLastFinalize, + .pPartialFunc = "_last_partial", + .pMergeFunc = "_last_merge", .combineFunc = lastCombine, }, { From 0dde9cc9f95c9d6aebabc8a3b4b2feb2be12ffc3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 14 Jun 2022 11:25:11 +0800 Subject: [PATCH 9/9] fix bugs --- source/libs/function/src/builtinsimpl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fe52e9c7c5..b5009b7d41 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2390,6 +2390,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + pInfo->hasResult = true; pResInfo->numOfRes = 1; } break; @@ -2407,6 +2408,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; + pInfo->hasResult = true; pResInfo->numOfRes = 1; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); }