From 5fe03fe3f4985ba7b9fdbd5fe48f6ea6a8212b26 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 26 Mar 2024 09:18:45 +0800 Subject: [PATCH] feat: irate support pk --- source/libs/function/inc/builtinsimpl.h | 2 +- source/libs/function/src/builtins.c | 3 +- source/libs/function/src/builtinsimpl.c | 86 ++++++++++++++++++------- 3 files changed, 64 insertions(+), 27 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 54bcd26bc3..ecc64fcd00 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -144,7 +144,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx); int32_t irateFunctionMerge(SqlFunctionCtx* pCtx); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t getIrateInfoSize(); +int32_t getIrateInfoSize(int32_t pkBytes); int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index cb2a979909..0f88572dbc 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1638,7 +1638,8 @@ static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t l if (!IS_NUMERIC_TYPE(colType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0; + pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize(pkBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; } else { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 2907c096fe..70fe2b19df 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -269,6 +269,12 @@ typedef struct SRateInfo { double lastValue; TSKEY lastKey; int8_t hasResult; // flag to denote has value + + char* firstPk; + char* lastPk; + int8_t pkType; + int32_t pkBytes; + char pkData[]; } SRateInfo; typedef struct SGroupKeyInfo { @@ -6165,10 +6171,11 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t getIrateInfoSize() { return (int32_t)sizeof(SRateInfo); } +int32_t getIrateInfoSize(int32_t pkBytes) { return (int32_t)sizeof(SRateInfo) + 2 * pkBytes; } bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(SRateInfo); + int32_t pkBytes = (pFunc->hasPk) ? pFunc->pkBytes : 0; + pEnv->calcMemSize = getIrateInfoSize(pkBytes); return true; } @@ -6188,6 +6195,36 @@ bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { return true; } +static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char* pk, double v) { + if (isFirst) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = ts; + if (pRateInfo->firstPk) { + int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes; + memcpy(pRateInfo->firstPk, pk, pkBytes); + } + } else { + pRateInfo->lastValue = v; + pRateInfo->lastKey = ts; + if (pRateInfo->lastPk) { + int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes; + memcpy(pRateInfo->lastPk, pk, pkBytes); + } + } +} + +static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo) { + if (pCtx->hasPrimaryKey) { + pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type; + pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes; + pRateInfo->firstPk = pRateInfo->pkData; + pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes; + } else { + pRateInfo->firstPk = NULL; + pRateInfo->lastPk = NULL; + } +} + int32_t irateFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -6198,6 +6235,8 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; funcInputUpdate(pCtx); + + initializeRateInfo(pCtx, pRateInfo); int32_t numOfElems = 0; int32_t type = pInputCol->info.type; @@ -6212,21 +6251,16 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { GET_TYPED_DATA(v, double, type, data); if (INT64_MIN == pRateInfo->lastKey) { - pRateInfo->lastValue = v; - pRateInfo->lastKey = row.ts; + doSaveRateInfo(pRateInfo, false, row.ts, row.pPk, v); pRateInfo->hasResult = 1; continue; } if (row.ts > pRateInfo->lastKey) { if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) { - pRateInfo->firstValue = pRateInfo->lastValue; - pRateInfo->firstKey = pRateInfo->lastKey; + doSaveRateInfo(pRateInfo, true, pRateInfo->lastKey, pRateInfo->lastPk, pRateInfo->lastValue); } - - pRateInfo->lastValue = v; - pRateInfo->lastKey = row.ts; - + doSaveRateInfo(pRateInfo, false, row.ts, row.pPk, v); continue; } else if (row.ts == pRateInfo->lastKey) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; @@ -6234,8 +6268,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { if ((INT64_MIN == pRateInfo->firstKey) || row.ts > pRateInfo->firstKey) { - pRateInfo->firstValue = v; - pRateInfo->firstKey = row.ts; + doSaveRateInfo(pRateInfo, true, row.ts, row.pPk, v); } else if (row.ts == pRateInfo->firstKey) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } @@ -6271,25 +6304,26 @@ static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) { static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) { if (inputKey > pOutput->lastKey) { - pOutput->firstKey = pOutput->lastKey; - pOutput->firstValue = pOutput->lastValue; - - pOutput->lastKey = isFirstKey ? pInput->firstKey : pInput->lastKey; - pOutput->lastValue = isFirstKey ? pInput->firstValue : pInput->lastValue; + doSaveRateInfo(pOutput, true, pOutput->lastKey, pOutput->lastPk, pOutput->lastValue); + if (isFirstKey) { + doSaveRateInfo(pOutput, false, pInput->firstKey, pInput->firstPk, pInput->firstValue); + } else { + doSaveRateInfo(pOutput, false, pInput->lastKey, pInput->lastPk, pInput->lastValue); + } } else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) { - pOutput->firstKey = isFirstKey ? pInput->firstKey : pInput->lastKey; - pOutput->firstValue = isFirstKey ? pInput->firstValue : pInput->lastValue; + if (isFirstKey) { + doSaveRateInfo(pOutput, true, pInput->firstKey, pInput->firstPk, pInput->firstValue); + } else { + doSaveRateInfo(pOutput, true, pInput->lastKey, pInput->lastPk, pInput->lastValue); + } } else { // inputKey < pOutput->firstKey } } static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) { - pOutput->firstKey = pInput->firstKey; - pOutput->lastKey = pInput->lastKey; - - pOutput->firstValue = pInput->firstValue; - pOutput->lastValue = pInput->lastValue; + doSaveRateInfo(pOutput, true, pInput->firstKey, pInput->firstPk, pInput->firstValue); + doSaveRateInfo(pOutput, false, pInput->lastKey, pInput->lastPk, pInput->lastValue); } static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) { @@ -6324,11 +6358,13 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) { } SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + initializeRateInfo(pCtx, pInfo); int32_t start = pInput->startRowIndex; for (int32_t i = start; i < start + pInput->numOfRows; ++i) { char* data = colDataGetData(pCol, i); SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data); + initializeRateInfo(pCtx, pInfo); if (pInputInfo->hasResult) { int32_t code = irateTransferInfo(pInputInfo, pInfo); if (code != TSDB_CODE_SUCCESS) { @@ -6347,7 +6383,7 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) { int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getIrateInfoSize(); + int32_t resultBytes = getIrateInfoSize(pInfo->pkBytes); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes);