diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 87bbe21133..f563d7f5a8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -41,6 +41,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx); +typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx typedef struct SScalarFuncExecFuncs { FExecGetEnv getEnv; @@ -48,11 +49,12 @@ typedef struct SScalarFuncExecFuncs { } SScalarFuncExecFuncs; typedef struct SFuncExecFuncs { - FExecGetEnv getEnv; - FExecInit init; - FExecProcess process; - FExecFinalize finalize; - FExecCombine combine; + FExecGetEnv getEnv; + FExecInit init; + FExecProcess process; + FExecFinalize finalize; + FExecCombine combine; + processFuncByRow processFuncByRow; } SFuncExecFuncs; #define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e77635727b..86db6640c5 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -255,6 +255,7 @@ bool fmIsIgnoreNullFunc(int32_t funcId); bool fmIsConstantResFunc(SFunctionNode* pFunc); bool fmIsSkipScanCheckFunc(int32_t funcId); bool fmIsPrimaryKeyFunc(int32_t funcId); +bool fmIsProcessByRowFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType, int32_t pkBytes); SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 19828d5146..d28ad1c54a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -724,9 +724,12 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) { + int32_t code = TSDB_CODE_SUCCESS; setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.dataLoad = 1; + SArray* diffFunctionCtx = NULL; + if (pSrcBlock == NULL) { for (int32_t k = 0; k < numOfOutput; ++k) { int32_t outputSlotId = pExpr[k].base.resSchema.slotId; @@ -743,7 +746,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } pResult->info.rows = 1; - return TSDB_CODE_SUCCESS; + goto _exit; } if (pResult != pSrcBlock) { @@ -816,10 +819,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); + code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); - return code; + goto _exit; } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; @@ -852,11 +855,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc pfCtx->pDstBlock = pResult; } - int32_t code = pfCtx->fpSet.process(pfCtx); + code = pfCtx->fpSet.process(pfCtx); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _exit; } numOfRows = pResInfo->numOfRes; + if (fmIsProcessByRowFunc(pfCtx->functionId)) { + if (NULL == diffFunctionCtx) { + diffFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*)); + if (!diffFunctionCtx) { + code = terrno; + goto _exit; + } + } + taosArrayPush(diffFunctionCtx, &pfCtx); + } } else if (fmIsAggFunc(pfCtx->functionId)) { // selective value output should be set during corresponding function execution if (fmIsSelectValueFunc(pfCtx->functionId)) { @@ -889,7 +902,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); - return code; + goto _exit; } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; @@ -905,9 +918,18 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } } + if (diffFunctionCtx && taosArrayGetSize(diffFunctionCtx) > 0){ + SqlFunctionCtx** pfCtx = taosArrayGet(diffFunctionCtx, 0); + (*pfCtx)->fpSet.processFuncByRow(diffFunctionCtx); + numOfRows = (*pfCtx)->resultInfo->numOfRes; + } if (!createNewColModel) { pResult->info.rows += numOfRows; } - - return TSDB_CODE_SUCCESS; +_exit: + if(diffFunctionCtx) { + taosArrayDestroy(diffFunctionCtx); + diffFunctionCtx = NULL; + } + return code; } diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 8c07a9d530..343f5b8367 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -50,6 +50,7 @@ typedef struct SBuiltinFuncDefinition { const char* pStateFunc; FCreateMergeFuncParameters createMergeParaFuc; FEstimateReturnRows estimateReturnRowsFunc; + processFuncByRow processFuncByRow; } SBuiltinFuncDefinition; extern const SBuiltinFuncDefinition funcMgtBuiltins[]; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index ecc64fcd00..7615584f8c 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -133,6 +133,7 @@ int32_t getApercentileMaxSize(); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx* pCtx); +int32_t diffFunctionByRow(SArray* pCtx); bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index a3f97af5d9..2c5c7725d5 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -57,6 +57,7 @@ extern "C" { #define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(28) #define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29) #define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found +#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31) #define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e3e84ac20b..66b3697a24 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1965,9 +1965,9 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } SValueNode* pValue = (SValueNode*)pParamNode1; - if (pValue->datum.i != 0 && pValue->datum.i != 1) { + if (pValue->datum.i < 0 || pValue->datum.i > 3) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "Second parameter of DIFF function should be only 0 or 1"); + "Second parameter of DIFF function should be a number between 0 and 3."); } pValue->notReserved = true; @@ -1986,11 +1986,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) { - if (1 == LIST_LENGTH(pFunc->pParameterList)) { - return FUNC_RETURN_ROWS_N_MINUS_1; - } - return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE - : FUNC_RETURN_ROWS_N_MINUS_1; + return FUNC_RETURN_ROWS_N_MINUS_1; } static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { @@ -3206,7 +3202,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_PROCESS_BY_ROW | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, @@ -3215,6 +3211,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = diffScalarFunction, .finalizeFunc = functionFinalize, .estimateReturnRowsFunc = diffEstReturnRows, + .processFuncByRow = diffFunctionByRow, }, { .name = "statecount", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1aa92479b9..e502420543 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -110,10 +110,9 @@ typedef enum { } EAPerctAlgoType; typedef struct SDiffInfo { - bool hasPrev; - bool includeNull; - bool ignoreNegative; // replace the ignore with case when - bool firstOutput; + bool hasPrev; + bool isFirstRow; + int8_t ignoreOption; // replace the ignore with case when union { int64_t i64; double d64; @@ -122,6 +121,12 @@ typedef struct SDiffInfo { int64_t prevTs; } SDiffInfo; +bool ignoreNegative(int8_t ignoreOption){ + return (ignoreOption & 0x1) == 0x1; +} +bool ignoreNull(int8_t ignoreOption){ + return (ignoreOption & 0x2) == 0x2; +} typedef struct SSpreadInfo { double result; bool hasResult; @@ -3100,15 +3105,14 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->hasPrev = false; + pDiffInfo->isFirstRow = true; pDiffInfo->prev.i64 = 0; pDiffInfo->prevTs = -1; if (pCtx->numOfParams > 1) { - pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param + pDiffInfo->ignoreOption = pCtx->param[1].param.i; // TODO set correct param } else { - pDiffInfo->ignoreNegative = false; + pDiffInfo->ignoreOption = 0; } - pDiffInfo->includeNull = true; - pDiffInfo->firstOutput = false; return true; } @@ -3148,16 +3152,56 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, return TSDB_CODE_SUCCESS; } +static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { + switch (type) { + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_INT: { + int32_t v = *(int32_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_TINYINT: { + int8_t v = *(int8_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_SMALLINT: { + int16_t v = *(int16_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_UBIGINT: + case TSDB_DATA_TYPE_BIGINT: { + int64_t v = *(int64_t*)pv; + return v < pDiffInfo->prev.i64; + } + case TSDB_DATA_TYPE_FLOAT: { + float v = *(float*)pv; + return v < pDiffInfo->prev.d64; + } + case TSDB_DATA_TYPE_DOUBLE: { + double v = *(double*)pv; + return v < pDiffInfo->prev.d64; + } + default: + return false; + } + + return false; +} + static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, - int64_t ts) { + int64_t ts) { pDiffInfo->prevTs = ts; switch (type) { case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: { int32_t v = *(int32_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); + pOutput->hasNull = true; } else { colDataSetInt64(pOutput, pos, &delta); } @@ -3170,7 +3214,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_TINYINT: { int8_t v = *(int8_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); } else { colDataSetInt64(pOutput, pos, &delta); @@ -3182,7 +3226,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_SMALLINT: { int16_t v = *(int16_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); } else { colDataSetInt64(pOutput, pos, &delta); @@ -3195,7 +3239,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { + if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) { colDataSetNull_f_s(pOutput, pos); } else { colDataSetInt64(pOutput, pos, &delta); @@ -3206,7 +3250,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_FLOAT: { float v = *(float*)pv; double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow + if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { colDataSetDouble(pOutput, pos, &delta); @@ -3217,7 +3261,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_DOUBLE: { double v = *(double*)pv; double delta = v - pDiffInfo->prev.d64; // direct previous may be null - if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow + if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { colDataSetDouble(pOutput, pos, &delta); @@ -3271,71 +3315,159 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f } } -int32_t diffFunction(SqlFunctionCtx* pCtx) { +int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){ + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + + if (pRow->isDataNull || !pDiffInfo->hasPrev ) { + return true; + } else if (ignoreNegative(pDiffInfo->ignoreOption)){ + return diffIsNegtive(pDiffInfo, pCtx->input.pData[0]->info.type, pRow->pData); + } + return false; +} + +bool isFirstRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + return pDiffInfo->isFirstRow; +} + +int32_t setPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + pDiffInfo->isFirstRow = false; + if (pRow->isDataNull) { + return TSDB_CODE_SUCCESS; + } + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + int8_t inputType = pInputCol->info.type; + + char* pv = pRow->pData; + int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, pRow->ts); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + pDiffInfo->hasPrev = true; + return TSDB_CODE_SUCCESS; +} + +int32_t doDiff(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; - int8_t inputType = pInputCol->info.type; + SColumnInfoData* pInputCol = pInput->pData[0]; + int8_t inputType = pInputCol->info.type; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + if (pRow->isDataNull) { + colDataSetNull_f_s(pOutput, pos); + pOutput->hasNull = true; - int32_t numOfElems = 0; - int32_t startOffset = pCtx->offset; - - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - - funcInputUpdate(pCtx); - - SFuncInputRow row = {0}; - while (funcInputGetNextRow(pCtx, &row)) { - int32_t pos = startOffset + numOfElems; - - if (row.isDataNull) { - if (pDiffInfo->includeNull) { - colDataSetNull_f_s(pOutput, pos); - - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); - } - - numOfElems += 1; - } - continue; + // handle selectivity + if (pCtx->subsidiaries.num > 0) { + appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); } - - char* pv = row.pData; - - if (pDiffInfo->hasPrev) { - if (row.ts == pDiffInfo->prevTs) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, row.ts); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); - } - - numOfElems++; - } else { - int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, row.ts); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - pDiffInfo->hasPrev = true; + return TSDB_CODE_SUCCESS; } - pResInfo->numOfRes = numOfElems; + char* pv = pRow->pData; + + if (pRow->ts == pDiffInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } + int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, pRow->ts); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + // handle selectivity + if (pCtx->subsidiaries.num > 0) { + appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); + } + pDiffInfo->hasPrev = true; + return TSDB_CODE_SUCCESS; } +int32_t diffFunction(SqlFunctionCtx* pCtx) { + return TSDB_CODE_SUCCESS; +} + +int32_t diffFunctionByRow(SArray* pCtxArray) { + int32_t code = TSDB_CODE_SUCCESS; + int diffColNum = pCtxArray->size; + if(diffColNum == 0) { + return TSDB_CODE_SUCCESS; + } + int32_t numOfElems = 0; + + SArray* pRows = taosArrayInit_s(sizeof(SFuncInputRow), diffColNum); + + bool keepNull = false; + for (int i = 0; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + funcInputUpdate(pCtx); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); + if (!ignoreNull(pDiffInfo->ignoreOption)) { + keepNull = true; + } + } + + SqlFunctionCtx* pCtx0 = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, 0); + SFuncInputRow* pRow0 = (SFuncInputRow*)taosArrayGet(pRows, 0); + int32_t startOffset = pCtx0->offset; + while (funcInputGetNextRow(pCtx0, pRow0)) { + bool hasNotNullValue = !diffResultIsNull(pCtx0, pRow0); + for (int i = 1; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i); + if(!funcInputGetNextRow(pCtx, pRow)) { + // rows are not equal + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _exit; + } + hasNotNullValue = !diffResultIsNull(pCtx, pRow); + } + int32_t pos = startOffset + numOfElems; + + bool newRow = false; + for (int i = 0; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i); + if ((keepNull || hasNotNullValue) && !isFirstRow(pCtx, pRow)){ + code = doDiff(pCtx, pRow, pos); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + newRow = true; + } else { + code = setPreVal(pCtx, pRow); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + } + } + if (newRow) ++numOfElems; + } + + for (int i = 0; i < diffColNum; ++i) { + SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + pResInfo->numOfRes = numOfElems; + } + +_exit: + if (pRows) { + taosArrayDestroy(pRows); + pRows = NULL; + } + return code; +} + int32_t getTopBotInfoSize(int64_t numOfItems) { return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); } bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 1e12595b28..b99e67697c 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -141,6 +141,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { pFpSet->process = funcMgtBuiltins[funcId].processFunc; pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc; pFpSet->combine = funcMgtBuiltins[funcId].combineFunc; + pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow; return TSDB_CODE_SUCCESS; } @@ -274,6 +275,8 @@ bool fmIsBlockDistFunc(int32_t funcId) { return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type; } +bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); } + bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); } void fmFuncMgtDestroy() {