From 1cba56860434f9053026523b132aeaa236b7afe6 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 11 Nov 2022 15:22:00 +0800 Subject: [PATCH] fix(query): report error if certain function query stable has duplicate timestamps TD-19892 --- source/libs/executor/src/executorimpl.c | 6 +++- source/libs/function/src/builtins.c | 4 +-- source/libs/function/src/builtinsimpl.c | 41 +++++++++++++++++++------ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7d662f8784..5d4986186e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -653,7 +653,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc pfCtx->pDstBlock = pResult; } - numOfRows = pfCtx->fpSet.process(pfCtx); + int32_t code = pfCtx->fpSet.process(pfCtx); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + numOfRows = pResInfo->numOfRes; } else if (fmIsAggFunc(pfCtx->functionId)) { // selective value output should be set during corresponding function execution if (fmIsSelectValueFunc(pfCtx->functionId)) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d3f03e8e9c..69b0ff1096 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2640,8 +2640,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6eb57c1a18..3f9fe46acd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3325,6 +3325,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->hasPrev = false; pDiffInfo->prev.i64 = 0; + pDiffInfo->prevTs = -1; if (pCtx->numOfParams > 1) { pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param } else { @@ -3335,7 +3336,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { return true; } -static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { +static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, int64_t ts) { switch (type) { case TSDB_DATA_TYPE_BOOL: pDiffInfo->prev.i64 = *(bool*)pv ? 1 : 0; @@ -3362,11 +3363,13 @@ static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { default: ASSERT(0); } + pDiffInfo->prevTs = ts; } static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, - int32_t order) { + int32_t order, int64_t ts) { int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1; + pDiffInfo->prevTs = ts; switch (type) { case TSDB_DATA_TYPE_INT: { int32_t v = *(int32_t*)pv; @@ -3450,6 +3453,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + int32_t numOfElems = 0; int32_t startOffset = pCtx->offset; @@ -3471,7 +3476,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { char* pv = colDataGetData(pInputCol, i); if (pDiffInfo->hasPrev) { - doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + if (tsList[i] == pDiffInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } + doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order, tsList[i]); // handle selectivity if (pCtx->subsidiaries.num > 0) { appendSelectivityValue(pCtx, i, pos); @@ -3479,7 +3487,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { numOfElems++; } else { - doSetPrevVal(pDiffInfo, pInputCol->info.type, pv); + doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]); } pDiffInfo->hasPrev = true; @@ -3501,7 +3509,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { // there is a row of previous data block to be handled in the first place. if (pDiffInfo->hasPrev) { - doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + if (tsList[i] == pDiffInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } + doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order, tsList[i]); // handle selectivity if (pCtx->subsidiaries.num > 0) { appendSelectivityValue(pCtx, i, pos); @@ -3509,15 +3520,15 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { numOfElems++; } else { - doSetPrevVal(pDiffInfo, pInputCol->info.type, pv); + doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]); } pDiffInfo->hasPrev = true; } } - // initial value is not set yet - return numOfElems; + pResInfo->numOfRes = numOfElems; + return TSDB_CODE_SUCCESS; } int32_t getTopBotInfoSize(int64_t numOfItems) { return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); } @@ -6137,6 +6148,9 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { if (!pDerivInfo->valueSet) { // initial value is not set yet pDerivInfo->valueSet = true; } else { + if (tsList[i] == pDerivInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } double r = ((v - pDerivInfo->prevValue) * pDerivInfo->tsWindow) / (tsList[i] - pDerivInfo->prevTs); if (pDerivInfo->ignoreNegative && r < 0) { } else { @@ -6175,6 +6189,9 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { if (!pDerivInfo->valueSet) { // initial value is not set yet pDerivInfo->valueSet = true; } else { + if (tsList[i] == pDerivInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } double r = ((pDerivInfo->prevValue - v) * pDerivInfo->tsWindow) / (pDerivInfo->prevTs - tsList[i]); if (pDerivInfo->ignoreNegative && r < 0) { } else { @@ -6202,7 +6219,9 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { } } - return numOfElems; + pResInfo->numOfRes = numOfElems; + + return TSDB_CODE_SUCCESS; } bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -6267,11 +6286,15 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { pRateInfo->lastKey = tsList[i]; continue; + } else if (tsList[i] == pRateInfo->lastKey) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; } if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) { pRateInfo->firstValue = v; pRateInfo->firstKey = tsList[i]; + } else if (tsList[i] == pRateInfo->firstKey) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; } }