From 5f0ac46029a227a029ef4a430fd47e968fcf6877 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 4 Jul 2024 11:04:59 +0800 Subject: [PATCH 1/9] diff --- include/libs/function/function.h | 12 +- include/libs/function/functionMgt.h | 1 + source/libs/executor/src/projectoperator.c | 38 ++- source/libs/function/inc/builtins.h | 1 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/inc/functionMgtInt.h | 1 + source/libs/function/src/builtins.c | 13 +- source/libs/function/src/builtinsimpl.c | 268 +++++++++++++++------ source/libs/function/src/functionMgt.c | 3 + 9 files changed, 249 insertions(+), 89 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 87bbe21133..f563d7f5a8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -41,6 +41,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx); +typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx typedef struct SScalarFuncExecFuncs { FExecGetEnv getEnv; @@ -48,11 +49,12 @@ typedef struct SScalarFuncExecFuncs { } SScalarFuncExecFuncs; typedef struct SFuncExecFuncs { - FExecGetEnv getEnv; - FExecInit init; - FExecProcess process; - FExecFinalize finalize; - FExecCombine combine; + FExecGetEnv getEnv; + FExecInit init; + FExecProcess process; + FExecFinalize finalize; + FExecCombine combine; + processFuncByRow processFuncByRow; } SFuncExecFuncs; #define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e77635727b..86db6640c5 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -255,6 +255,7 @@ bool fmIsIgnoreNullFunc(int32_t funcId); bool fmIsConstantResFunc(SFunctionNode* pFunc); bool fmIsSkipScanCheckFunc(int32_t funcId); bool fmIsPrimaryKeyFunc(int32_t funcId); +bool fmIsProcessByRowFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType, int32_t pkBytes); SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 19828d5146..d28ad1c54a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -724,9 +724,12 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) { + int32_t code = TSDB_CODE_SUCCESS; setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.dataLoad = 1; + SArray* diffFunctionCtx = NULL; + if (pSrcBlock == NULL) { for (int32_t k = 0; k < numOfOutput; ++k) { int32_t outputSlotId = pExpr[k].base.resSchema.slotId; @@ -743,7 +746,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } pResult->info.rows = 1; - return TSDB_CODE_SUCCESS; + goto _exit; } if (pResult != pSrcBlock) { @@ -816,10 +819,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); + code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); - return code; + goto _exit; } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; @@ -852,11 +855,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc pfCtx->pDstBlock = pResult; } - int32_t code = pfCtx->fpSet.process(pfCtx); + code = pfCtx->fpSet.process(pfCtx); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _exit; } numOfRows = pResInfo->numOfRes; + if (fmIsProcessByRowFunc(pfCtx->functionId)) { + if (NULL == diffFunctionCtx) { + diffFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*)); + if (!diffFunctionCtx) { + code = terrno; + goto _exit; + } + } + taosArrayPush(diffFunctionCtx, &pfCtx); + } } else if (fmIsAggFunc(pfCtx->functionId)) { // selective value output should be set during corresponding function execution if (fmIsSelectValueFunc(pfCtx->functionId)) { @@ -889,7 +902,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); - return code; + goto _exit; } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; @@ -905,9 +918,18 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } } + if (diffFunctionCtx && taosArrayGetSize(diffFunctionCtx) > 0){ + SqlFunctionCtx** pfCtx = taosArrayGet(diffFunctionCtx, 0); + (*pfCtx)->fpSet.processFuncByRow(diffFunctionCtx); + numOfRows = (*pfCtx)->resultInfo->numOfRes; + } if (!createNewColModel) { pResult->info.rows += numOfRows; } - - return TSDB_CODE_SUCCESS; +_exit: + if(diffFunctionCtx) { + taosArrayDestroy(diffFunctionCtx); + diffFunctionCtx = NULL; + } + return code; } diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 8c07a9d530..343f5b8367 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -50,6 +50,7 @@ typedef struct SBuiltinFuncDefinition { const char* pStateFunc; FCreateMergeFuncParameters createMergeParaFuc; FEstimateReturnRows estimateReturnRowsFunc; + processFuncByRow processFuncByRow; } SBuiltinFuncDefinition; extern const SBuiltinFuncDefinition funcMgtBuiltins[]; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index ecc64fcd00..7615584f8c 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -133,6 +133,7 @@ int32_t getApercentileMaxSize(); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx* pCtx); +int32_t diffFunctionByRow(SArray* pCtx); bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index a3f97af5d9..2c5c7725d5 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -57,6 +57,7 @@ extern "C" { #define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(28) #define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29) #define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found +#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e3e84ac20b..66b3697a24 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1965,9 +1965,9 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } SValueNode* pValue = (SValueNode*)pParamNode1; - if (pValue->datum.i != 0 && pValue->datum.i != 1) { + if (pValue->datum.i < 0 || pValue->datum.i > 3) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "Second parameter of DIFF function should be only 0 or 1"); + "Second parameter of DIFF function should be a number between 0 and 3."); } pValue->notReserved = true; @@ -1986,11 +1986,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) { - if (1 == LIST_LENGTH(pFunc->pParameterList)) { - return FUNC_RETURN_ROWS_N_MINUS_1; - } - return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE - : FUNC_RETURN_ROWS_N_MINUS_1; + return FUNC_RETURN_ROWS_N_MINUS_1; } static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { @@ -3206,7 +3202,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_PROCESS_BY_ROW | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, @@ -3215,6 +3211,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = diffScalarFunction, .finalizeFunc = functionFinalize, .estimateReturnRowsFunc = diffEstReturnRows, + .processFuncByRow = diffFunctionByRow, }, { .name = "statecount", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1aa92479b9..e502420543 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -110,10 +110,9 @@ typedef enum { } EAPerctAlgoType; typedef struct SDiffInfo { - bool hasPrev; - bool includeNull; - bool ignoreNegative; // replace the ignore with case when - bool firstOutput; + bool hasPrev; + bool isFirstRow; + int8_t ignoreOption; // replace the ignore with case when union { int64_t i64; double d64; @@ -122,6 +121,12 @@ typedef struct SDiffInfo { int64_t prevTs; } SDiffInfo; +bool ignoreNegative(int8_t ignoreOption){ + return (ignoreOption & 0x1) == 0x1; +} +bool ignoreNull(int8_t ignoreOption){ + return (ignoreOption & 0x2) == 0x2; +} typedef struct SSpreadInfo { double result; bool hasResult; @@ -3100,15 +3105,14 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->hasPrev = false; + pDiffInfo->isFirstRow = true; pDiffInfo->prev.i64 = 0; pDiffInfo->prevTs = -1; if (pCtx->numOfParams > 1) { - pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param + pDiffInfo->ignoreOption = pCtx->param[1].param.i; // TODO set correct param } else { - pDiffInfo->ignoreNegative = false; + pDiffInfo->ignoreOption = 0; } - pDiffInfo->includeNull = true; - pDiffInfo->firstOutput = false; return true; } @@ -3148,16 +3152,56 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, return TSDB_CODE_SUCCESS; } +static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { + switch (type) { + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_INT: { + int32_t v = *(int32_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_TINYINT: { + int8_t v = *(int8_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_SMALLINT: { + int16_t v = *(int16_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_BIGINT: { + int64_t v = *(int64_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_FLOAT: { + float v = *(float*)pv; + return v < pDiffInfo->prev.d64; + } + case TSDB_DATA_TYPE_DOUBLE: { + double v = *(double*)pv; + return v < pDiffInfo->prev.d64; + } + default: + return false; + } + + return false; +} + static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, - int64_t ts) { + int64_t ts) { pDiffInfo->prevTs = ts; switch (type) { case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: { int32_t v = *(int32_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); + pOutput->hasNull = true; } else { colDataSetInt64(pOutput, pos, &delta); } @@ -3170,7 +3214,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_TINYINT: { int8_t v = *(int8_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); } else { colDataSetInt64(pOutput, pos, &delta); @@ -3182,7 +3226,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_SMALLINT: { int16_t v = *(int16_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); } else { colDataSetInt64(pOutput, pos, &delta); @@ -3195,7 +3239,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); } else { colDataSetInt64(pOutput, pos, &delta); @@ -3206,7 +3250,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_FLOAT: { float v = *(float*)pv; double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow + if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { colDataSetDouble(pOutput, pos, &delta); @@ -3217,7 +3261,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_DOUBLE: { double v = *(double*)pv; double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow + if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { colDataSetDouble(pOutput, pos, &delta); @@ -3271,71 +3315,159 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f } } -int32_t diffFunction(SqlFunctionCtx* pCtx) { +int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){ + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + + if (pRow->isDataNull || !pDiffInfo->hasPrev ) { + return true; + } else if (ignoreNegative(pDiffInfo->ignoreOption)){ + return diffIsNegtive(pDiffInfo, pCtx->input.pData[0]->info.type, pRow->pData); + } + return false; +} + +bool isFirstRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + return pDiffInfo->isFirstRow; +} + +int32_t setPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + pDiffInfo->isFirstRow = false; + if (pRow->isDataNull) { + return TSDB_CODE_SUCCESS; + } + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + int8_t inputType = pInputCol->info.type; + + char* pv = pRow->pData; + int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, pRow->ts); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + pDiffInfo->hasPrev = true; + return TSDB_CODE_SUCCESS; +} + +int32_t doDiff(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; - int8_t inputType = pInputCol->info.type; + SColumnInfoData* pInputCol = pInput->pData[0]; + int8_t inputType = pInputCol->info.type; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + if (pRow->isDataNull) { + colDataSetNull_f_s(pOutput, pos); + pOutput->hasNull = true; - int32_t numOfElems = 0; - int32_t startOffset = pCtx->offset; - - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - - funcInputUpdate(pCtx); - - SFuncInputRow row = {0}; - while (funcInputGetNextRow(pCtx, &row)) { - int32_t pos = startOffset + numOfElems; - - if (row.isDataNull) { - if (pDiffInfo->includeNull) { - colDataSetNull_f_s(pOutput, pos); - - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); - } - - numOfElems += 1; - } - continue; + // handle selectivity + if (pCtx->subsidiaries.num > 0) { + appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); } - - char* pv = row.pData; - - if (pDiffInfo->hasPrev) { - if (row.ts == pDiffInfo->prevTs) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, row.ts); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); - } - - numOfElems++; - } else { - int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, row.ts); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - pDiffInfo->hasPrev = true; + return TSDB_CODE_SUCCESS; } - pResInfo->numOfRes = numOfElems; + char* pv = pRow->pData; + + if (pRow->ts == pDiffInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } + int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, pRow->ts); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + // handle selectivity + if (pCtx->subsidiaries.num > 0) { + appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); + } + pDiffInfo->hasPrev = true; + return TSDB_CODE_SUCCESS; } +int32_t diffFunction(SqlFunctionCtx* pCtx) { + return TSDB_CODE_SUCCESS; +} + +int32_t diffFunctionByRow(SArray* pCtxArray) { + int32_t code = TSDB_CODE_SUCCESS; + int diffColNum = pCtxArray->size; + if(diffColNum == 0) { + return TSDB_CODE_SUCCESS; + } + int32_t numOfElems = 0; + + SArray* pRows = taosArrayInit_s(sizeof(SFuncInputRow), diffColNum); + + bool keepNull = false; + for (int i = 0; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + funcInputUpdate(pCtx); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + if (!ignoreNull(pDiffInfo->ignoreOption)) { + keepNull = true; + } + } + + SqlFunctionCtx* pCtx0 = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, 0); + SFuncInputRow* pRow0 = (SFuncInputRow*)taosArrayGet(pRows, 0); + int32_t startOffset = pCtx0->offset; + while (funcInputGetNextRow(pCtx0, pRow0)) { + bool hasNotNullValue = !diffResultIsNull(pCtx0, pRow0); + for (int i = 1; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i); + if(!funcInputGetNextRow(pCtx, pRow)) { + // rows are not equal + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _exit; + } + hasNotNullValue = !diffResultIsNull(pCtx, pRow); + } + int32_t pos = startOffset + numOfElems; + + bool newRow = false; + for (int i = 0; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i); + if ((keepNull || hasNotNullValue) && !isFirstRow(pCtx, pRow)){ + code = doDiff(pCtx, pRow, pos); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + newRow = true; + } else { + code = setPreVal(pCtx, pRow); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + } + } + if (newRow) ++numOfElems; + } + + for (int i = 0; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + pResInfo->numOfRes = numOfElems; + } + +_exit: + if (pRows) { + taosArrayDestroy(pRows); + pRows = NULL; + } + return code; +} + int32_t getTopBotInfoSize(int64_t numOfItems) { return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); } bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 1e12595b28..b99e67697c 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -141,6 +141,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { pFpSet->process = funcMgtBuiltins[funcId].processFunc; pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc; pFpSet->combine = funcMgtBuiltins[funcId].combineFunc; + pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow; return TSDB_CODE_SUCCESS; } @@ -274,6 +275,8 @@ bool fmIsBlockDistFunc(int32_t funcId) { return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type; } +bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); } + bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); } void fmFuncMgtDestroy() { From 593d8adc42780ad8f75575cd10e2405bd0a37527 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 4 Jul 2024 15:56:12 +0800 Subject: [PATCH 2/9] test case --- source/libs/executor/src/projectoperator.c | 5 +- source/libs/function/src/builtinsimpl.c | 30 ++-- tests/system-test/2-query/diff.py | 184 ++++++++++++++++++++- 3 files changed, 200 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index d28ad1c54a..1674646a7a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -920,7 +920,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (diffFunctionCtx && taosArrayGetSize(diffFunctionCtx) > 0){ SqlFunctionCtx** pfCtx = taosArrayGet(diffFunctionCtx, 0); - (*pfCtx)->fpSet.processFuncByRow(diffFunctionCtx); + code = (*pfCtx)->fpSet.processFuncByRow(diffFunctionCtx); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } numOfRows = (*pfCtx)->resultInfo->numOfRes; } if (!createNewColModel) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e502420543..0feff5657c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3148,7 +3148,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } pDiffInfo->prevTs = ts; - + pDiffInfo->hasPrev = true; return TSDB_CODE_SUCCESS; } @@ -3192,7 +3192,11 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) } static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, - int64_t ts) { + int64_t ts) { + if (!pDiffInfo->hasPrev) { + colDataSetNull_f_s(pOutput, pos); + return doSetPrevVal(pDiffInfo, type, pv, ts); + } pDiffInfo->prevTs = ts; switch (type) { case TSDB_DATA_TYPE_UINT: @@ -3272,7 +3276,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, default: return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } - + pDiffInfo->hasPrev = true; return TSDB_CODE_SUCCESS; } @@ -3333,7 +3337,7 @@ bool isFirstRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { return pDiffInfo->isFirstRow; } -int32_t setPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { +int32_t trySetPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->isFirstRow = false; @@ -3346,15 +3350,10 @@ int32_t setPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { int8_t inputType = pInputCol->info.type; char* pv = pRow->pData; - int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, pRow->ts); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - pDiffInfo->hasPrev = true; - return TSDB_CODE_SUCCESS; + return doSetPrevVal(pDiffInfo, inputType, pv, pRow->ts); } -int32_t doDiff(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) { +int32_t setDoDiffResult(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -3387,7 +3386,6 @@ int32_t doDiff(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) { if (pCtx->subsidiaries.num > 0) { appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); } - pDiffInfo->hasPrev = true; return TSDB_CODE_SUCCESS; } @@ -3430,7 +3428,9 @@ int32_t diffFunctionByRow(SArray* pCtxArray) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; goto _exit; } - hasNotNullValue = !diffResultIsNull(pCtx, pRow); + if (!diffResultIsNull(pCtx, pRow)) { + hasNotNullValue = true; + } } int32_t pos = startOffset + numOfElems; @@ -3439,13 +3439,13 @@ int32_t diffFunctionByRow(SArray* pCtxArray) { SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i); if ((keepNull || hasNotNullValue) && !isFirstRow(pCtx, pRow)){ - code = doDiff(pCtx, pRow, pos); + code = setDoDiffResult(pCtx, pRow, pos); if (code != TSDB_CODE_SUCCESS) { goto _exit; } newRow = true; } else { - code = setPreVal(pCtx, pRow); + code = trySetPreVal(pCtx, pRow); if (code != TSDB_CODE_SUCCESS) { goto _exit; } diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 15f73344d3..ee71db6d34 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -45,12 +45,181 @@ class TDTestCase: else: tdSql.checkData(i, j, 1) + def ignoreTest(self): + dbname = "db" + + ts1 = 1694912400000 + tdSql.execute(f'''create table {dbname}.stb30749(ts timestamp, col1 tinyint, col2 smallint) tags(loc nchar(20))''') + tdSql.execute(f"create table {dbname}.stb30749_1 using {dbname}.stb30749 tags('shanghai')") + + tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, null, 1)" % (ts1 + 1)) + tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 3, null)" % (ts1 + 2)) + tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 4, 3)" % (ts1 + 3)) + tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 1, 1)" % (ts1 + 4)) + tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, 2, null)" % (ts1 + 5)) + tdSql.execute(f"insert into {dbname}.stb30749_1 values(%d, null, null)" % (ts1 + 6)) + + tdSql.query(f"select ts, diff(col1) from {dbname}.stb30749_1") + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(2, 1, -3) + tdSql.checkData(3, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(3, 1, 1) + tdSql.checkData(4, 0, '2023-09-17 09:00:00.006') + tdSql.checkData(4, 1, None) + + tdSql.query(f"select ts, diff(col1, 1) from {dbname}.stb30749_1") + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(2, 1, None) + tdSql.checkData(3, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(3, 1, 1) + tdSql.checkData(4, 0, '2023-09-17 09:00:00.006') + tdSql.checkData(4, 1, None) + + tdSql.query(f"select ts, diff(col1, 2) from {dbname}.stb30749_1") + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(1, 1, -3) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(2, 1, 1) + + tdSql.query(f"select ts, diff(col1, 3) from {dbname}.stb30749_1") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(1, 1, 1) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 0) from {dbname}.stb30749_1") + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 1, None) + tdSql.checkData(2, 2, -2) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 1) from {dbname}.stb30749_1") + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 1, None) + tdSql.checkData(2, 2, None) + + tdSql.query(f"select ts, diff(col1, 2), diff(col2, 2) from {dbname}.stb30749_1") + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(2, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, -3) + tdSql.checkData(2, 1, 1) + tdSql.checkData(0, 2, 2) + tdSql.checkData(1, 2, -2) + tdSql.checkData(2, 2, None) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749_1") + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(2, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, None) + tdSql.checkData(2, 1, 1) + tdSql.checkData(0, 2, 2) + tdSql.checkData(1, 2, -2) + tdSql.checkData(2, 2, None) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 3) from {dbname}.stb30749_1") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 1) + tdSql.checkData(0, 2, 2) + tdSql.checkData(1, 2, None) + + tdSql.execute(f"create table {dbname}.stb30749_2 using {dbname}.stb30749 tags('shanghai')") + + tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, 1)" % (ts1 - 1)) + tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, 4, 3)" % (ts1 + 0)) + tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, null)" % (ts1 + 10)) + + tdSql.query(f"select ts, diff(col1), diff(col2) from {dbname}.stb30749") + tdSql.checkRows(8) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, -1) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 1, 1) + tdSql.checkData(3, 2, 2) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749") + tdSql.checkRows(5) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(2, 1, 1) + tdSql.checkData(2, 2, 2) + tdSql.checkData(3, 1, None) + tdSql.checkData(3, 2, -2) + + tdSql.query(f"select ts, diff(col1), diff(col2) from {dbname}.stb30749 partition by tbname") + tdSql.checkRows(7) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(1, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 1, 1) + tdSql.checkData(1, 2, 2) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749 partition by tbname") + tdSql.checkRows(4) + tdSql.checkData(3, 0, '2023-09-17 09:00:00.000') + tdSql.checkData(3, 1, None) + tdSql.checkData(3, 2, 2) + + tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, 1)" % (ts1 + 1)) + tdSql.error(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749") + + def withPkTest(self): + dbname = "db" + + ts1 = 1694912400000 + tdSql.execute(f'''create table {dbname}.stb5(ts timestamp, col1 int PRIMARY KEY, col2 smallint) tags(loc nchar(20))''') + tdSql.execute(f"create table {dbname}.stb5_1 using {dbname}.stb5 tags('shanghai')") + + tdSql.execute(f"insert into {dbname}.stb5_1 values(%d, 2, 1)" % (ts1 + 1)) + tdSql.execute(f"insert into {dbname}.stb5_1 values(%d, 3, null)" % (ts1 + 2)) + tdSql.execute(f"insert into {dbname}.stb5_1 values(%d, 4, 3)" % (ts1 + 3)) + + tdSql.execute(f"create table {dbname}.stb5_2 using {dbname}.stb5 tags('shanghai')") + + tdSql.execute(f"insert into {dbname}.stb5_2 values(%d, 5, 4)" % (ts1 + 1)) + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb5") + tdSql.checkRows(2) + + tdSql.execute(f"insert into {dbname}.stb5_2 values(%d, 3, 3)" % (ts1 + 2)) + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb5") + tdSql.checkRows(2) + def run(self): tdSql.prepare() dbname = "db" # full type test self.full_datatype_test() + + self.ignoreTest() + self.withPkTest() tdSql.execute( f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)") @@ -219,9 +388,18 @@ class TDTestCase: tdSql.error(f"select diff(col1,1.23) from {dbname}.stb_1") tdSql.error(f"select diff(col1,-1) from {dbname}.stb_1") tdSql.query(f"select ts,diff(col1),ts from {dbname}.stb_1") - tdSql.error(f"select diff(col1, 1),diff(col2) from {dbname}.stb_1") - tdSql.error(f"select diff(col1, 1),diff(col2, 0) from {dbname}.stb_1") - tdSql.error(f"select diff(col1, 1),diff(col2, 1) from {dbname}.stb_1") + tdSql.error(f"select diff(col1, -1) from {dbname}.stb_1") + tdSql.error(f"select diff(col1, 4) from {dbname}.stb_1") + tdSql.error(f"select diff(col1, 1),diff(col2, 4) from {dbname}.stb_1") + + tdSql.query(f"select diff(col1, 1),diff(col2) from {dbname}.stb_1") + tdSql.checkRows(self.rowNum) + + tdSql.query(f"select diff(col1, 1),diff(col2, 0) from {dbname}.stb_1") + tdSql.checkRows(self.rowNum) + + tdSql.query(f"select diff(col1, 1),diff(col2, 1) from {dbname}.stb_1") + tdSql.checkRows(self.rowNum) tdSql.query(f"select diff(ts) from {dbname}.stb_1") tdSql.checkRows(10) From 748e34f15e2cd06452e93035e0c9447ea2927d6a Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 4 Jul 2024 19:34:12 +0800 Subject: [PATCH 3/9] fix test case --- tests/system-test/2-query/max_partition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/max_partition.py b/tests/system-test/2-query/max_partition.py index 7e43597948..c4635dcf50 100644 --- a/tests/system-test/2-query/max_partition.py +++ b/tests/system-test/2-query/max_partition.py @@ -172,7 +172,7 @@ class TDTestCase: tdSql.checkRows(90) tdSql.query(f"select c1 , diff(c1 , 0) from {dbname}.stb partition by c1") - tdSql.checkRows(140) + tdSql.checkRows(139) tdSql.query(f"select c1 , csum(c1) from {dbname}.stb partition by c1") tdSql.checkRows(100) From 99f9c28e77129c4bd912ddf0ef6be460befe0697 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Fri, 5 Jul 2024 08:07:00 +0800 Subject: [PATCH 4/9] fix: return code --- source/libs/executor/src/projectoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 1674646a7a..2a03489547 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -899,7 +899,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); + code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); goto _exit; From 28dc3c85ea9342844e1987c3be5a63b13060c936 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 8 Jul 2024 20:38:23 +0800 Subject: [PATCH 5/9] type overflow --- include/libs/nodes/querynodes.h | 1 + source/libs/executor/src/projectoperator.c | 22 ++-- source/libs/function/src/builtins.c | 8 +- source/libs/function/src/builtinsimpl.c | 110 +++++++++++-------- source/libs/parser/src/parTranslater.c | 12 ++- tests/system-test/2-query/diff.py | 117 +++++++++++++++++++++ 6 files changed, 209 insertions(+), 61 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 457937835d..dfb92861d6 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -415,6 +415,7 @@ typedef struct SSelectStmt { int32_t returnRows; // EFuncReturnRows ETimeLineMode timeLineCurMode; ETimeLineMode timeLineResMode; + bool hasProcessByRowFunc; bool timeLineFromOrderBy; bool isEmptyResult; bool isSubquery; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 2a03489547..73134c55ef 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -728,7 +728,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.dataLoad = 1; - SArray* diffFunctionCtx = NULL; + SArray* processByRowFunctionCtx = NULL; if (pSrcBlock == NULL) { for (int32_t k = 0; k < numOfOutput; ++k) { @@ -861,14 +861,14 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } numOfRows = pResInfo->numOfRes; if (fmIsProcessByRowFunc(pfCtx->functionId)) { - if (NULL == diffFunctionCtx) { - diffFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*)); - if (!diffFunctionCtx) { + if (NULL == processByRowFunctionCtx) { + processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*)); + if (!processByRowFunctionCtx) { code = terrno; goto _exit; } } - taosArrayPush(diffFunctionCtx, &pfCtx); + taosArrayPush(processByRowFunctionCtx, &pfCtx); } } else if (fmIsAggFunc(pfCtx->functionId)) { // selective value output should be set during corresponding function execution @@ -918,9 +918,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } } - if (diffFunctionCtx && taosArrayGetSize(diffFunctionCtx) > 0){ - SqlFunctionCtx** pfCtx = taosArrayGet(diffFunctionCtx, 0); - code = (*pfCtx)->fpSet.processFuncByRow(diffFunctionCtx); + if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0){ + SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0); + code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx); if (code != TSDB_CODE_SUCCESS) { goto _exit; } @@ -930,9 +930,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc pResult->info.rows += numOfRows; } _exit: - if(diffFunctionCtx) { - taosArrayDestroy(diffFunctionCtx); - diffFunctionCtx = NULL; + if(processByRowFunctionCtx) { + taosArrayDestroy(processByRowFunctionCtx); + processByRowFunctionCtx = NULL; } return code; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 66b3697a24..5058ebb8c3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1977,7 +1977,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (IS_SIGNED_NUMERIC_TYPE(colType) || IS_TIMESTAMP_TYPE(colType) || TSDB_DATA_TYPE_BOOL == colType) { resType = TSDB_DATA_TYPE_BIGINT; } else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) { - resType = TSDB_DATA_TYPE_UBIGINT; + resType = TSDB_DATA_TYPE_BIGINT; } else { resType = TSDB_DATA_TYPE_DOUBLE; } @@ -1986,7 +1986,11 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) { - return FUNC_RETURN_ROWS_N_MINUS_1; + if (1 == LIST_LENGTH(pFunc->pParameterList)) { + return FUNC_RETURN_ROWS_N_MINUS_1; + } + return 1 < ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE + : FUNC_RETURN_ROWS_N_MINUS_1; } static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0feff5657c..be6c555e56 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3152,37 +3152,49 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, return TSDB_CODE_SUCCESS; } -static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { +static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { switch (type) { - case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UINT: { + int64_t v = *(uint32_t*)pv; + return v < pDiffInfo->prev.i64; + } case TSDB_DATA_TYPE_INT: { - int32_t v = *(int32_t*)pv; + int64_t v = *(int32_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_BOOL: { + int64_t v = *(bool*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_UTINYINT: { + int64_t v = *(int8_t*)pv; return v < pDiffInfo->prev.i64; } - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: { - int8_t v = *(int8_t*)pv; + int64_t v = *(uint8_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_USMALLINT: { + int64_t v = *(uint16_t*)pv; return v < pDiffInfo->prev.i64; } - case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_SMALLINT: { - int16_t v = *(int16_t*)pv; + int64_t v = *(int16_t*)pv; return v < pDiffInfo->prev.i64; } case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; - return v < pDiffInfo->prev.i64; + return v - pDiffInfo->prev.i64 < 0; } case TSDB_DATA_TYPE_FLOAT: { - float v = *(float*)pv; - return v < pDiffInfo->prev.d64; + float v = *(float*)pv; + return v < pDiffInfo->prev.d64; } case TSDB_DATA_TYPE_DOUBLE: { double v = *(double*)pv; - return v < pDiffInfo->prev.d64; + return v < pDiffInfo->prev.d64; } default: return false; @@ -3191,6 +3203,17 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) return false; } +static void tryToSetInt64(SDiffInfo* pDiffInfo, SColumnInfoData* pOutput, int64_t v, int32_t pos) { + int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { + colDataSetNull_f_s(pOutput, pos); + pOutput->hasNull = true; + } else { + colDataSetInt64(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; +} + static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int64_t ts) { if (!pDiffInfo->hasPrev) { @@ -3199,43 +3222,38 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, } pDiffInfo->prevTs = ts; switch (type) { - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_INT: { - int32_t v = *(int32_t*)pv; - int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { - colDataSetNull_f_s(pOutput, pos); - pOutput->hasNull = true; - } else { - colDataSetInt64(pOutput, pos, &delta); - } - pDiffInfo->prev.i64 = v; - + case TSDB_DATA_TYPE_UINT: { + int64_t v = *(uint32_t*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); + break; + } + case TSDB_DATA_TYPE_INT: { + int64_t v = *(int32_t*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); + break; + } + case TSDB_DATA_TYPE_BOOL: { + int64_t v = *(bool*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + int64_t v = *(int8_t*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); break; } - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: { - int8_t v = *(int8_t*)pv; - int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { - colDataSetNull_f_s(pOutput, pos); - } else { - colDataSetInt64(pOutput, pos, &delta); - } - pDiffInfo->prev.i64 = v; + int64_t v = *(uint8_t*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); break; } case TSDB_DATA_TYPE_USMALLINT: + int64_t v = *(uint16_t*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); + break; case TSDB_DATA_TYPE_SMALLINT: { - int16_t v = *(int16_t*)pv; - int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { - colDataSetNull_f_s(pOutput, pos); - } else { - colDataSetInt64(pOutput, pos, &delta); - } - pDiffInfo->prev.i64 = v; + int64_t v = *(int16_t*)pv; + tryToSetInt64(pDiffInfo, pOutput, v, pos); break; } case TSDB_DATA_TYPE_TIMESTAMP: @@ -3252,9 +3270,10 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, break; } case TSDB_DATA_TYPE_FLOAT: { - float v = *(float*)pv; + double v = *(float*)pv; double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow + if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || + isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { colDataSetDouble(pOutput, pos, &delta); @@ -3265,7 +3284,8 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_DOUBLE: { double v = *(double*)pv; double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow + if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || + isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { colDataSetDouble(pOutput, pos, &delta); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b741db3ae6..35c42e2ffb 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2194,9 +2194,12 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); } SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; - if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc || - (pSelect->hasIndefiniteRowsFunc && - (FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc)))) { + if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); + } + if (pSelect->hasIndefiniteRowsFunc && + (FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc)) && + (!pSelect->hasProcessByRowFunc || !fmIsProcessByRowFunc(pFunc->funcId))) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); } if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) { @@ -2462,6 +2465,9 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { } else if (fmIsInterpFunc(pFunc->funcId)) { pSelect->returnRows = fmGetFuncReturnRows(pFunc); } + if (fmIsProcessByRowFunc(pFunc->funcId)) { + pSelect->hasProcessByRowFunc = true; + } pSelect->hasMultiRowsFunc = pSelect->hasMultiRowsFunc ? true : fmIsMultiRowsFunc(pFunc->funcId); if (fmIsSelectFunc(pFunc->funcId)) { diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index ee71db6d34..4400e8c26d 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -154,6 +154,15 @@ class TDTestCase: tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, 4, 3)" % (ts1 + 0)) tdSql.execute(f"insert into {dbname}.stb30749_2 values(%d, null, null)" % (ts1 + 10)) + tdSql.query(f"select ts, diff(col1), diff(col2, 1) from {dbname}.stb30749") + tdSql.checkRows(8) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, -1) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 1, 1) + tdSql.checkData(3, 2, 2) + tdSql.query(f"select ts, diff(col1), diff(col2) from {dbname}.stb30749") tdSql.checkRows(8) tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') @@ -163,6 +172,42 @@ class TDTestCase: tdSql.checkData(3, 1, 1) tdSql.checkData(3, 2, 2) + tdSql.query(f"select ts, diff(col1), diff(col2, 3) from {dbname}.stb30749") + tdSql.checkRows(8) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, -1) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 1, 1) + tdSql.checkData(3, 2, 2) + + tdSql.query(f"select ts, diff(col1, 1), diff(col2, 2) from {dbname}.stb30749") + tdSql.checkRows(8) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, None) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 1, 1) + tdSql.checkData(3, 2, 2) + + tdSql.query(f"select ts, diff(col1, 1), diff(col2, 3) from {dbname}.stb30749") + tdSql.checkRows(8) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, None) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 1, 1) + tdSql.checkData(3, 2, 2) + + tdSql.query(f"select ts, diff(col1, 2), diff(col2, 2) from {dbname}.stb30749") + tdSql.checkRows(6) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, -1) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 1, 1) + tdSql.checkData(3, 2, 2) + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb30749") tdSql.checkRows(5) tdSql.checkData(2, 0, '2023-09-17 09:00:00.003') @@ -172,6 +217,24 @@ class TDTestCase: tdSql.checkData(3, 1, None) tdSql.checkData(3, 2, -2) + tdSql.query(f"select ts, diff(col1, 2), diff(col2, 3) from {dbname}.stb30749") + tdSql.checkRows(5) + tdSql.checkData(2, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(3, 0, '2023-09-17 09:00:00.004') + tdSql.checkData(2, 1, 1) + tdSql.checkData(2, 2, 2) + tdSql.checkData(3, 1, -3) + tdSql.checkData(3, 2, None) + + tdSql.query(f"select ts, diff(col1, 3), diff(col2, 3) from {dbname}.stb30749") + tdSql.checkRows(3) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 0, '2023-09-17 09:00:00.005') + tdSql.checkData(1, 1, 1) + tdSql.checkData(1, 2, 2) + tdSql.checkData(2, 1, 1) + tdSql.checkData(2, 2, None) + tdSql.query(f"select ts, diff(col1), diff(col2) from {dbname}.stb30749 partition by tbname") tdSql.checkRows(7) tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') @@ -210,6 +273,59 @@ class TDTestCase: tdSql.execute(f"insert into {dbname}.stb5_2 values(%d, 3, 3)" % (ts1 + 2)) tdSql.query(f"select ts, diff(col1, 3), diff(col2, 2) from {dbname}.stb5") tdSql.checkRows(2) + + + def typeOverflowTest(self): + dbname = "db" + + ts1 = 1694912400000 + tdSql.execute(f'''create table {dbname}.stb6(ts timestamp, c1 int, c2 smallint, c3 int unsigned, c4 BIGINT, c5 BIGINT unsigned) tags(loc nchar(20))''') + tdSql.execute(f"create table {dbname}.stb6_1 using {dbname}.stb6 tags('shanghai')") + + tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -2147483648, -32768, 0, 9223372036854775806, 9223372036854775806)" % (ts1 + 1)) + tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, 2147483647, 32767, 4294967295, 0, 0)" % (ts1 + 2)) + tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, -9223372036854775806, 9223372036854775806)" % (ts1 + 3)) + + tdSql.query(f"select ts, diff(c1), diff(c2), diff(c3), diff(c4), diff(c5) from {dbname}.stb6_1") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.002') + tdSql.checkData(0, 1, 4294967295) + tdSql.checkData(0, 2, 65535) + tdSql.checkData(0, 3, 4294967295) + tdSql.checkData(0, 4, -9223372036854775806) + tdSql.checkData(0, 5, -9223372036854775806) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 1, -2147483657) + tdSql.checkData(1, 2, -32777) + tdSql.checkData(1, 3, -4294967295) + tdSql.checkData(1, 4, -9223372036854775806) + tdSql.checkData(1, 5, 9223372036854775806) + + tdSql.query(f"select ts, diff(c1, 1), diff(c2) from {dbname}.stb6_1") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 4294967295) + tdSql.checkData(0, 2, 65535) + tdSql.checkData(1, 1, None) + tdSql.checkData(1, 2, -32777) + + tdSql.query(f"select ts, diff(c1, 1), diff(c2, 1) from {dbname}.stb6_1") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 4294967295) + tdSql.checkData(0, 2, 65535) + tdSql.checkData(1, 1, None) + tdSql.checkData(1, 2, None) + + tdSql.query(f"select ts, diff(c1, 2), diff(c2, 3) from {dbname}.stb6_1") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 4294967295) + tdSql.checkData(0, 2, 65535) + tdSql.checkData(1, 1, -2147483657) + tdSql.checkData(1, 2, None) + + tdSql.query(f"select ts, diff(c1, 3), diff(c2, 3) from {dbname}.stb6_1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 4294967295) + tdSql.checkData(0, 2, 65535) def run(self): tdSql.prepare() @@ -220,6 +336,7 @@ class TDTestCase: self.ignoreTest() self.withPkTest() + self.typeOverflowTest() tdSql.execute( f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)") From 3e320ebdf11639c9e184e885dbd88c8637df9e26 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Mon, 8 Jul 2024 23:22:41 +0800 Subject: [PATCH 6/9] build failed --- source/libs/function/src/builtinsimpl.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index be6c555e56..05989e7b41 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3247,10 +3247,11 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, tryToSetInt64(pDiffInfo, pOutput, v, pos); break; } - case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_USMALLINT:{ int64_t v = *(uint16_t*)pv; tryToSetInt64(pDiffInfo, pOutput, v, pos); break; + } case TSDB_DATA_TYPE_SMALLINT: { int64_t v = *(int16_t*)pv; tryToSetInt64(pDiffInfo, pOutput, v, pos); From 20f4eda56d22115e5d9e0958249b496f4e68322a Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 9 Jul 2024 07:05:40 +0800 Subject: [PATCH 7/9] fix: tinyint --- source/libs/function/src/builtinsimpl.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 05989e7b41..c730aa334b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3167,11 +3167,11 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) return v < pDiffInfo->prev.i64; } case TSDB_DATA_TYPE_UTINYINT: { - int64_t v = *(int8_t*)pv; + int64_t v = *(uint8_t*)pv; return v < pDiffInfo->prev.i64; } case TSDB_DATA_TYPE_TINYINT: { - int64_t v = *(uint8_t*)pv; + int64_t v = *(int8_t*)pv; return v < pDiffInfo->prev.i64; } case TSDB_DATA_TYPE_USMALLINT: { @@ -3238,12 +3238,12 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, break; } case TSDB_DATA_TYPE_UTINYINT: { - int64_t v = *(int8_t*)pv; + int64_t v = *(uint8_t*)pv; tryToSetInt64(pDiffInfo, pOutput, v, pos); break; } case TSDB_DATA_TYPE_TINYINT: { - int64_t v = *(uint8_t*)pv; + int64_t v = *(int8_t*)pv; tryToSetInt64(pDiffInfo, pOutput, v, pos); break; } From 163b31a4ee0f18d651c4c3f3ee76a3cf67f53867 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 9 Jul 2024 14:48:21 +0800 Subject: [PATCH 8/9] fix: type overflow --- include/libs/nodes/querynodes.h | 2 +- include/util/taoserror.h | 1 + source/libs/function/src/builtinsimpl.c | 67 ++++++++++++------------- source/libs/parser/src/parTranslater.c | 8 ++- source/libs/parser/src/parUtil.c | 3 ++ source/util/src/terror.c | 1 + tests/system-test/2-query/diff.py | 67 +++++++++++++++++++++++-- 7 files changed, 107 insertions(+), 42 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index dfb92861d6..34b42fd9e1 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -415,7 +415,7 @@ typedef struct SSelectStmt { int32_t returnRows; // EFuncReturnRows ETimeLineMode timeLineCurMode; ETimeLineMode timeLineResMode; - bool hasProcessByRowFunc; + int32_t lastProcessByRowFuncId; bool timeLineFromOrderBy; bool isEmptyResult; bool isSubquery; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2de336d036..38197c8504 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -831,6 +831,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0x267D) #define TSDB_CODE_PAR_TBNAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267E) #define TSDB_CODE_PAR_TAG_NAME_DUPLICATED TAOS_DEF_ERROR_CODE(0, 0x267F) +#define TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC TAOS_DEF_ERROR_CODE(0, 0x2680) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c730aa334b..7d40aacdf7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3183,10 +3183,13 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) return v < pDiffInfo->prev.i64; } case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_UBIGINT:{ + uint64_t v = *(uint64_t*)pv; + return v < (uint64_t)pDiffInfo->prev.i64; + } case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; - return v - pDiffInfo->prev.i64 < 0; + return v < pDiffInfo->prev.i64; } case TSDB_DATA_TYPE_FLOAT: { float v = *(float*)pv; @@ -3203,9 +3206,13 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) return false; } -static void tryToSetInt64(SDiffInfo* pDiffInfo, SColumnInfoData* pOutput, int64_t v, int32_t pos) { - int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { +static void tryToSetInt64(SDiffInfo* pDiffInfo, int32_t type, SColumnInfoData* pOutput, int64_t v, int32_t pos) { + bool isNegative = v < pDiffInfo->prev.i64; + if(type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP){ + isNegative = (uint64_t)v < (uint64_t)pDiffInfo->prev.i64; + } + int64_t delta = v - pDiffInfo->prev.i64; + if (isNegative && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); pOutput->hasNull = true; } else { @@ -3214,6 +3221,16 @@ static void tryToSetInt64(SDiffInfo* pDiffInfo, SColumnInfoData* pOutput, int64_ pDiffInfo->prev.i64 = v; } +static void tryToSetDouble(SDiffInfo* pDiffInfo, SColumnInfoData* pOutput, double v, int32_t pos) { + double delta = v - pDiffInfo->prev.d64; + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { + colDataSetNull_f_s(pOutput, pos); + } else { + colDataSetDouble(pOutput, pos, &delta); + } + pDiffInfo->prev.d64 = v; +} + static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int64_t ts) { if (!pDiffInfo->hasPrev) { @@ -3224,74 +3241,54 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, switch (type) { case TSDB_DATA_TYPE_UINT: { int64_t v = *(uint32_t*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_INT: { int64_t v = *(int32_t*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_BOOL: { int64_t v = *(bool*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_UTINYINT: { int64_t v = *(uint8_t*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_TINYINT: { int64_t v = *(int8_t*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_USMALLINT:{ int64_t v = *(uint16_t*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_SMALLINT: { int64_t v = *(int16_t*)pv; - tryToSetInt64(pDiffInfo, pOutput, v, pos); + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; - int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { - colDataSetNull_f_s(pOutput, pos); - } else { - colDataSetInt64(pOutput, pos, &delta); - } - pDiffInfo->prev.i64 = v; + tryToSetInt64(pDiffInfo, type, pOutput, v, pos); break; } case TSDB_DATA_TYPE_FLOAT: { double v = *(float*)pv; - double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || - isnan(delta)) { // check for overflow - colDataSetNull_f_s(pOutput, pos); - } else { - colDataSetDouble(pOutput, pos, &delta); - } - pDiffInfo->prev.d64 = v; + tryToSetDouble(pDiffInfo, pOutput, v, pos); break; } case TSDB_DATA_TYPE_DOUBLE: { double v = *(double*)pv; - double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || - isnan(delta)) { // check for overflow - colDataSetNull_f_s(pOutput, pos); - } else { - colDataSetDouble(pOutput, pos, &delta); - } - pDiffInfo->prev.d64 = v; + tryToSetDouble(pDiffInfo, pOutput, v, pos); break; } default: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 35c42e2ffb..85e148fece 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2199,9 +2199,12 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod } if (pSelect->hasIndefiniteRowsFunc && (FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc)) && - (!pSelect->hasProcessByRowFunc || !fmIsProcessByRowFunc(pFunc->funcId))) { + (pSelect->lastProcessByRowFuncId == -1 || !fmIsProcessByRowFunc(pFunc->funcId))) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); } + if (pSelect->lastProcessByRowFuncId != -1 && pSelect->lastProcessByRowFuncId != pFunc->funcId) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC); + } if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, "%s function is not supported in window query or group query", pFunc->functionName); @@ -2466,7 +2469,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { pSelect->returnRows = fmGetFuncReturnRows(pFunc); } if (fmIsProcessByRowFunc(pFunc->funcId)) { - pSelect->hasProcessByRowFunc = true; + pSelect->lastProcessByRowFuncId = pFunc->funcId; } pSelect->hasMultiRowsFunc = pSelect->hasMultiRowsFunc ? true : fmIsMultiRowsFunc(pFunc->funcId); @@ -3404,6 +3407,7 @@ static int32_t checkIsEmptyResult(STranslateContext* pCxt, SSelectStmt* pSelect) static int32_t resetSelectFuncNumWithoutDup(SSelectStmt* pSelect) { if (pSelect->selectFuncNum <= 1) return TSDB_CODE_SUCCESS; pSelect->selectFuncNum = 0; + pSelect->lastProcessByRowFuncId = -1; SNodeList* pNodeList = nodesMakeList(); int32_t code = nodesCollectSelectFuncs(pSelect, SQL_CLAUSE_FROM, NULL, fmIsSelectFunc, pNodeList); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 416faafe35..d67c7d306f 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -221,6 +221,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Table name:%s duplicated"; case TSDB_CODE_PAR_TAG_NAME_DUPLICATED: return "Tag name:%s duplicated"; + case TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC: + return "Some functions cannot appear in the select list at the same time"; default: return "Unknown error"; } @@ -772,6 +774,7 @@ SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* select->onlyHasKeepOrderFunc = true; select->timeRange = TSWINDOW_INITIALIZER; select->pHint = pHint; + select->lastProcessByRowFuncId = -1; return (SNode*)select; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c5bba6fa53..dd1aeff5a2 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -678,6 +678,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_ERROR, "Pseudo tag tbname not set") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TBNAME_DUPLICATED, "Table name duplicated") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_NAME_DUPLICATED, "Tag name duplicated") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_DIFFERENT_BY_ROW_FUNC, "Some functions cannot appear in the select list at the same time") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 4400e8c26d..fd7ef00680 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -275,7 +275,7 @@ class TDTestCase: tdSql.checkRows(2) - def typeOverflowTest(self): + def intOverflowTest(self): dbname = "db" ts1 = 1694912400000 @@ -284,7 +284,7 @@ class TDTestCase: tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -2147483648, -32768, 0, 9223372036854775806, 9223372036854775806)" % (ts1 + 1)) tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, 2147483647, 32767, 4294967295, 0, 0)" % (ts1 + 2)) - tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, -9223372036854775806, 9223372036854775806)" % (ts1 + 3)) + tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, -9223372036854775806, 16223372036854775806)" % (ts1 + 3)) tdSql.query(f"select ts, diff(c1), diff(c2), diff(c3), diff(c4), diff(c5) from {dbname}.stb6_1") tdSql.checkRows(2) @@ -299,7 +299,6 @@ class TDTestCase: tdSql.checkData(1, 2, -32777) tdSql.checkData(1, 3, -4294967295) tdSql.checkData(1, 4, -9223372036854775806) - tdSql.checkData(1, 5, 9223372036854775806) tdSql.query(f"select ts, diff(c1, 1), diff(c2) from {dbname}.stb6_1") tdSql.checkRows(2) @@ -326,6 +325,65 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 1, 4294967295) tdSql.checkData(0, 2, 65535) + + tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, 9223372036854775800, 0)" % (ts1 + 4)) + tdSql.execute(f"insert into {dbname}.stb6_1 values(%d, -10, -10, 0, 9223372036854775800, 16223372036854775806)" % (ts1 + 5)) + + tdSql.query(f"select ts, diff(c4, 0) from {dbname}.stb6_1") + tdSql.checkRows(4) + + tdSql.query(f"select ts, diff(c4, 1) from {dbname}.stb6_1") + tdSql.checkRows(4) + tdSql.checkData(2, 1, -10) + + tdSql.query(f"select ts, diff(c4, 2) from {dbname}.stb6_1") + tdSql.checkRows(4) + + tdSql.query(f"select ts, diff(c4, 3) from {dbname}.stb6_1") + tdSql.checkRows(2) + tdSql.checkData(0, 1, -10) + tdSql.checkData(1, 1, 0) + + tdSql.query(f"select ts, diff(c5, 0) from {dbname}.stb6_1") + tdSql.checkRows(4) + + tdSql.query(f"select ts, diff(c5, 1) from {dbname}.stb6_1") + tdSql.checkRows(4) + tdSql.checkData(0, 1, None) + tdSql.checkData(1, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(2, 1, None) + tdSql.checkData(3, 0, '2023-09-17 09:00:00.005') + + tdSql.query(f"select ts, diff(c5, 2) from {dbname}.stb6_1") + tdSql.checkRows(4) + + tdSql.query(f"select ts, diff(c5, 3) from {dbname}.stb6_1") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') + tdSql.checkData(1, 0, '2023-09-17 09:00:00.005') + + def doubleOverflowTest(self): + dbname = "db" + + ts1 = 1694912400000 + tdSql.execute(f'''create table {dbname}.stb7(ts timestamp, c1 float, c2 double) tags(loc nchar(20))''') + tdSql.execute(f"create table {dbname}.stb7_1 using {dbname}.stb7 tags('shanghai')") + + tdSql.execute(f"insert into {dbname}.stb7_1 values(%d, 334567777777777777777343434343333333733, 334567777777777777777343434343333333733)" % (ts1 + 1)) + tdSql.execute(f"insert into {dbname}.stb7_1 values(%d, -334567777777777777777343434343333333733, -334567777777777777777343434343333333733)" % (ts1 + 2)) + tdSql.execute(f"insert into {dbname}.stb7_1 values(%d, 334567777777777777777343434343333333733, 334567777777777777777343434343333333733)" % (ts1 + 3)) + + tdSql.query(f"select ts, diff(c1), diff(c2) from {dbname}.stb7_1") + tdSql.checkRows(2) + + tdSql.query(f"select ts, diff(c1, 1), diff(c2, 1) from {dbname}.stb7_1") + tdSql.checkRows(2) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + + tdSql.query(f"select ts, diff(c1, 3), diff(c2, 3) from {dbname}.stb7_1") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2023-09-17 09:00:00.003') def run(self): tdSql.prepare() @@ -336,7 +394,8 @@ class TDTestCase: self.ignoreTest() self.withPkTest() - self.typeOverflowTest() + self.intOverflowTest() + self.doubleOverflowTest() tdSql.execute( f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)") From 81b62f4a155f4149094ed905882daebf598ec8de Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 9 Jul 2024 18:47:33 +0800 Subject: [PATCH 9/9] fix: use int64 for timestamp --- source/libs/function/src/builtinsimpl.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7d40aacdf7..3c7edad873 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3182,11 +3182,11 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) int64_t v = *(int16_t*)pv; return v < pDiffInfo->prev.i64; } - case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_UBIGINT:{ uint64_t v = *(uint64_t*)pv; return v < (uint64_t)pDiffInfo->prev.i64; } + case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; return v < pDiffInfo->prev.i64; @@ -3208,7 +3208,7 @@ static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) static void tryToSetInt64(SDiffInfo* pDiffInfo, int32_t type, SColumnInfoData* pOutput, int64_t v, int32_t pos) { bool isNegative = v < pDiffInfo->prev.i64; - if(type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP){ + if(type == TSDB_DATA_TYPE_UBIGINT){ isNegative = (uint64_t)v < (uint64_t)pDiffInfo->prev.i64; } int64_t delta = v - pDiffInfo->prev.i64;