From d2059e8079986c1f334c60c4a2a0cc63fb1c19c9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Jun 2022 14:42:05 +0800 Subject: [PATCH 1/3] feat(query): add irate function --- source/libs/function/inc/builtinsimpl.h | 5 + source/libs/function/src/builtins.c | 25 ++++ source/libs/function/src/builtinsimpl.c | 160 +++++++++++++++++++++--- 3 files changed, 170 insertions(+), 20 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index e691d562c6..4d2f926c12 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -101,6 +101,11 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); int32_t derivativeFunction(SqlFunctionCtx *pCtx); +bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); +int32_t irateFunction(SqlFunctionCtx *pCtx); +int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index cfad00f458..6baf45a5b3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -978,6 +978,21 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t return TSDB_CODE_SUCCESS; } +static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // first(col_list) will be rewritten as first(col) if (1 != LIST_LENGTH(pFunc->pParameterList)) { @@ -1796,6 +1811,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = derivativeFunction, .finalizeFunc = functionFinalize }, + { + .name = "irate", + .type = FUNCTION_TYPE_IRATE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateIrate, + .getEnvFunc = getIrateFuncEnv, + .initFunc = irateFuncSetup, + .processFunc = irateFunction, + .finalizeFunc = irateFinalize + }, { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 53a5815844..aa565067f8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -59,6 +59,12 @@ typedef struct STuplePos { int32_t offset; } STuplePos; +typedef struct SMinmaxResInfo { + bool assign; // assign the first value or not + int64_t v; + STuplePos tuplePos; +} SMinmaxResInfo; + typedef struct STopBotResItem { SVariant v; uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data @@ -148,6 +154,12 @@ typedef struct SElapsedInfo { int64_t timeUnit; } SElapsedInfo; +typedef struct STwaInfo { + double dOutput; + SPoint1 p; + STimeWindow win; +} STwaInfo; + typedef struct SHistoFuncBin { double lower; double upper; @@ -234,6 +246,22 @@ typedef struct SUniqueInfo { char pItems[]; } SUniqueInfo; +typedef struct SDerivInfo { + double prevValue; // previous value + TSKEY prevTs; // previous timestamp + bool ignoreNegative; // ignore the negative value + int64_t tsWindow; // time window for derivative + bool valueSet; // the value has been set already +} SDerivInfo; + +typedef struct SRateInfo { + double firstValue; + TSKEY firstKey; + double lastValue; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value +} SRateInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -927,12 +955,6 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin return FUNC_DATA_REQUIRED_STATIS_LOAD; } -typedef struct SMinmaxResInfo { - bool assign; // assign the first value or not - int64_t v; - STuplePos tuplePos; -} SMinmaxResInfo; - bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; // not initialized since it has been initialized @@ -4667,12 +4689,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } -typedef struct STwaInfo { - double dOutput; - SPoint1 p; - STimeWindow win; -} STwaInfo; - bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(STwaInfo); return true; @@ -5121,14 +5137,6 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return TSDB_CODE_SUCCESS; } -typedef struct SDerivInfo { - double prevValue; // previous value - TSKEY prevTs; // previous timestamp - bool ignoreNegative; // ignore the negative value - int64_t tsWindow; // time window for derivative - bool valueSet; // the value has been set already -} SDerivInfo; - bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SDerivInfo); return true; @@ -5223,6 +5231,118 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { return numOfElems; } +bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SRateInfo); + return true; +} + +bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; // not initialized since it has been initialized + } + + SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + pInfo->firstKey = INT64_MIN; + pInfo->lastKey = INT64_MIN; + pInfo->firstValue = (double)INT64_MIN; + pInfo->lastValue = (double)INT64_MIN; + + pInfo->hasResult = 0; + return true; +} + +int32_t irateFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + int32_t numOfElems = 0; + int32_t type = pInputCol->info.type; + + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + continue; + } + + numOfElems++; + + char* data = colDataGetData(pInputCol, i); + double v = 0; + GET_TYPED_DATA(v, double, type, data); + + if (INT64_MIN == pRateInfo->lastKey) { + pRateInfo->lastValue = v; + pRateInfo->lastKey = tsList[i]; + continue; + } + + if (tsList[i] > pRateInfo->lastKey) { + if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) { + pRateInfo->firstValue = pRateInfo->lastValue; + pRateInfo->firstKey = pRateInfo->lastKey; + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = tsList[i]; + + continue; + } + + if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = tsList[i]; + break; + } + + } + + SET_VAL(pResInfo, numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) { + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || + (pRateInfo->firstKey >= pRateInfo->lastKey)) { + return 0.0; + } + + double diff = 0; + // If the previous value of the last is greater than the last value, only keep the last point instead of the delta + // value between two values. + diff = pRateInfo->lastValue; + if (diff >= pRateInfo->firstValue) { + diff -= pRateInfo->firstValue; + } + + int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; + if (duration == 0) { + return 0; + } + + return (duration > 0)? ((double)diff) / (duration/tickPerSec):0.0; +} + +int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; + + SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + double result = doCalcRate(pInfo, 1000); + colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes); + + return pResInfo->numOfRes; +} + int32_t interpFunction(SqlFunctionCtx* pCtx) { #if 0 int32_t fillType = (int32_t) pCtx->param[2].i64; From 767eefea0cf237216f20ce95ce48a95b0f207c04 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Jun 2022 15:08:41 +0800 Subject: [PATCH 2/3] fix irate first_value bugs --- source/libs/function/src/builtinsimpl.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index aa565067f8..de4cf01cec 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5298,7 +5298,6 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) { pRateInfo->firstValue = v; pRateInfo->firstKey = tsList[i]; - break; } } From bde4e7a09be923a74f61b81e887a3821a7d36abe Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Jun 2022 16:06:36 +0800 Subject: [PATCH 3/3] fix json_tag.py test case --- tests/system-test/2-query/json_tag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 957e916e34..052ff923ac 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -518,7 +518,7 @@ class TDTestCase: tdSql.query("select avg(dataint) from jsons1 where jtag is not null") tdSql.checkData(0, 0, 5.3) #tdSql.error("select twa(dataint) from jsons1 where jtag is not null") - tdSql.error("select irate(dataint) from jsons1 where jtag is not null") + tdSql.query("select irate(dataint) from jsons1 where jtag is not null") #tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null") #tdSql.checkData(0, 0, 49) tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1")