From 6a9441528a366b99307e53c23bf4abfca904f716 Mon Sep 17 00:00:00 2001 From: sima Date: Fri, 19 Jul 2024 18:43:35 +0800 Subject: [PATCH] enh:[TD-31043] Handling return value in builtinsimpl.c --- include/libs/function/function.h | 2 +- include/libs/function/tudf.h | 2 +- include/util/taoserror.h | 2 + source/client/src/clientEnv.c | 6 +- source/libs/executor/src/executorInt.c | 4 +- source/libs/executor/src/projectoperator.c | 33 +- source/libs/executor/src/timewindowoperator.c | 13 +- source/libs/function/inc/builtinsimpl.h | 46 +- source/libs/function/inc/tfunctionInt.h | 2 +- source/libs/function/inc/tpercentile.h | 3 +- source/libs/function/src/builtinsimpl.c | 1102 ++++++++++------- .../libs/function/src/detail/tavgfunction.c | 15 +- source/libs/function/src/thistogram.c | 2 +- source/libs/function/src/tpercentile.c | 74 +- source/libs/function/src/tudf.c | 19 +- source/libs/parser/test/parTestMain.cpp | 3 +- source/libs/planner/test/planTestMain.cpp | 3 +- source/util/src/terror.c | 2 + 18 files changed, 802 insertions(+), 531 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 700cc5ba7f..24fa2898ea 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -36,7 +36,7 @@ typedef struct SFuncExecEnv { } SFuncExecEnv; typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); -typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); +typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 6b15833917..7a8927ca90 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -77,7 +77,7 @@ void freeUdfInterBuf(SUdfInterBuf *buf); // high level APIs bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); -bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); +int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 7d1ce80b6c..9b49c1908d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -862,6 +862,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_FUNC_TO_CHAR_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2809) #define TSDB_CODE_FUNC_TIME_UNIT_INVALID TAOS_DEF_ERROR_CODE(0, 0x280A) #define TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL TAOS_DEF_ERROR_CODE(0, 0x280B) +#define TSDB_CODE_FUNC_INVALID_VALUE_RANGE TAOS_DEF_ERROR_CODE(0, 0x280C) +#define TSDB_CODE_FUNC_SETUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x280D) //udf diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 724229af16..f640618897 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -897,7 +897,11 @@ void taos_init_imp(void) { tscError("failed to init task queue"); return; } - fmFuncMgtInit(); + if (fmFuncMgtInit() != TSDB_CODE_SUCCESS) { + tscInitRes = -1; + tscError("failed to init function manager"); + return; + } nodesInitAllocatorSet(); clientConnRefPool = taosOpenRef(200, destroyTscObj); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index a3e3501114..4ee7030d9a 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -521,8 +521,8 @@ int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t n if (!pResInfo->initialized) { if (pCtx[i].functionId != -1) { - bool ini = pCtx[i].fpSet.init(&pCtx[i], pResInfo); - if (!ini && fmIsUserDefinedFunc(pCtx[i].functionId)) { + int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo); + if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)){ pResInfo->initialized = false; return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 6167497c21..93901b6b33 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -44,8 +44,8 @@ static int32_t doGenerateSourceData(SOperatorInfo* pOperator); static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator); static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator); static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols); -static void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, - int32_t numOfExprs); +static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, + int32_t stage, int32_t numOfExprs); static void destroyProjectOperatorInfo(void* param) { if (NULL == param) { @@ -142,7 +142,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys } initBasicInfo(&pInfo->binfo, pResBlock); - setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); + code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -447,7 +450,11 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy goto _error; } - setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); + code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -589,7 +596,8 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { return (rows > 0) ? pInfo->pRes : NULL; } -void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { +int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { + int32_t code = TSDB_CODE_SUCCESS; for (int32_t j = 0; j < size; ++j) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 || @@ -597,8 +605,12 @@ void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { continue; } - pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo); + code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } + return code; } /* @@ -610,7 +622,7 @@ void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { * offset[0] offset[1] offset[2] */ // TODO refactor: some function move away -void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, +int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; @@ -632,7 +644,7 @@ void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SA pCtx[i].scanFlag = stage; } - initCtxOutputBuffer(pCtx, numOfExprs); + return initCtxOutputBuffer(pCtx, numOfExprs); } SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { @@ -841,7 +853,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc // do nothing } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx); - pfCtx->fpSet.init(pfCtx, pResInfo); + code = pfCtx->fpSet.init(pfCtx, pResInfo); + if (TSDB_CODE_SUCCESS != code) { + goto _exit; + } pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fb3176a2a8..5678400410 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1086,12 +1086,13 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pBlock; } -static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { +static int32_t doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); if (NULL == pResult) { - return; + return TSDB_CODE_SUCCESS; } + int32_t code = TSDB_CODE_SUCCESS; SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); @@ -1101,15 +1102,19 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, } pResInfo->initialized = false; if (pCtx[i].functionId != -1) { - pCtx[i].fpSet.init(&pCtx[i], pResInfo); + code = pCtx[i].fpSet.init(&pCtx[i], pResInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId); if (NULL == bufPage) { - return; + return TSDB_CODE_SUCCESS; } setBufPageDirty(bufPage, true); releaseBufPage(pResultBuf, bufPage); + return TSDB_CODE_SUCCESS; } static void destroyStateWindowOperatorInfo(void* param) { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index b48a617b9c..274ef61feb 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -49,9 +49,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); -const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); +int32_t loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos, char** value); -bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult); int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); @@ -74,7 +74,7 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx); int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t minFunction(SqlFunctionCtx* pCtx); int32_t maxFunction(SqlFunctionCtx* pCtx); @@ -83,7 +83,7 @@ int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunction(SqlFunctionCtx* pCtx); int32_t avgFunctionMerge(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -97,7 +97,7 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getAvgInfoSize(); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -111,18 +111,18 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getStddevInfoSize(); bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t leastSQRFunction(SqlFunctionCtx* pCtx); int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t percentileFunction(SqlFunctionCtx* pCtx); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t apercentileFunction(SqlFunctionCtx* pCtx); int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -131,16 +131,16 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) int32_t getApercentileMaxSize(); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); +int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx* pCtx); int32_t diffFunctionByRow(SArray* pCtx); bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); +int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t derivativeFunction(SqlFunctionCtx* pCtx); bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); +int32_t irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t irateFunction(SqlFunctionCtx* pCtx); int32_t irateFunctionMerge(SqlFunctionCtx* pCtx); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -165,7 +165,7 @@ EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo); int32_t lastRowFunction(SqlFunctionCtx* pCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); -bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t topFunction(SqlFunctionCtx* pCtx); int32_t bottomFunction(SqlFunctionCtx* pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -174,7 +174,7 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(int64_t numOfItems); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -183,7 +183,7 @@ int32_t getSpreadInfoSize(); int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t elapsedFunction(SqlFunctionCtx* pCtx); int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx); int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -192,7 +192,7 @@ int32_t getElapsedInfoSize(); int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx); int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx); @@ -210,7 +210,7 @@ int32_t getHLLInfoSize(); int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool stateFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t stateFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); int32_t stateDurationFunction(SqlFunctionCtx* pCtx); @@ -218,35 +218,35 @@ bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t csumFunction(SqlFunctionCtx* pCtx); bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t mavgFunction(SqlFunctionCtx* pCtx); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t tailFunction(SqlFunctionCtx* pCtx); bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t uniqueFunction(SqlFunctionCtx* pCtx); bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t modeFunction(SqlFunctionCtx* pCtx); int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t twaFunction(SqlFunctionCtx* pCtx); int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistFunction(SqlFunctionCtx* pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/inc/tfunctionInt.h b/source/libs/function/inc/tfunctionInt.h index b4c48abf37..8d9e42576d 100644 --- a/source/libs/function/inc/tfunctionInt.h +++ b/source/libs/function/inc/tfunctionInt.h @@ -37,7 +37,7 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32 pResInfo->complete = false; pResInfo->numOfRes = 0; - memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); + (void)memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); } #ifdef __cplusplus diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 34815a34ad..90fb279259 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -67,7 +67,8 @@ typedef struct tMemBucket { SHashObj *groupPagesMap; // disk page map for different groups; } tMemBucket; -tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup); +int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup, + tMemBucket **pBucket); void tMemBucketDestroy(tMemBucket *pBucket); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5f6565be1f..5c960e2580 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -429,7 +429,7 @@ typedef struct SGroupKeyInfo { (_p).val = (_v); \ } while (0) -int32_t funcInputUpdate(SqlFunctionCtx* pCtx) { +void funcInputUpdate(SqlFunctionCtx* pCtx) { SFuncInputRowIter* pIter = &pCtx->rowIter; if (!pCtx->bInputFinished) { @@ -448,11 +448,9 @@ int32_t funcInputUpdate(SqlFunctionCtx* pCtx) { } else { pIter->finalRow = true; } - - return TSDB_CODE_SUCCESS; } -bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { +int32_t funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow, bool *res) { if (pIter->finalRow) { if (pIter->hasPrev) { pRow->ts = pIter->prevBlockTsEnd; @@ -462,29 +460,41 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { pRow->rowIndex = 0; pIter->hasPrev = false; - return true; + *res = true; + return TSDB_CODE_SUCCESS; } else { - return false; + *res = false; + return TSDB_CODE_SUCCESS; } } if (pIter->hasPrev) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { - blockDataDestroy(pIter->pPrevRowBlock); + (void)blockDataDestroy(pIter->pPrevRowBlock); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex); pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes); + if (NULL == pIter->pPrevData) { + qError("out of memory when function get input row."); + return TSDB_CODE_OUT_OF_MEMORY; + } char* srcData = colDataGetData(pIter->pDataCol, pIter->inputEndIndex); - memcpy(pIter->pPrevData, srcData, pIter->pDataCol->info.bytes); + (void)memcpy(pIter->pPrevData, srcData, pIter->pDataCol->info.bytes); pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes); + if (NULL == pIter->pPrevPk) { + qError("out of memory when function get input row."); + taosMemoryFree(pIter->pPrevData); + return TSDB_CODE_OUT_OF_MEMORY; + } char* pkData = colDataGetData(pIter->pPkCol, pIter->inputEndIndex); - memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes); + (void)memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); pIter->hasPrev = true; - return false; + *res = false; + return TSDB_CODE_SUCCESS; } else { int32_t idx = pIter->rowIndex; while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { @@ -506,7 +516,8 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { } pIter->hasPrev = false; pIter->rowIndex = idx; - return true; + *res = true; + return TSDB_CODE_SUCCESS; } } else { TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; @@ -522,18 +533,29 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { pRow->block = pIter->pSrcBlock; pIter->rowIndex = idx + 1; - return true; + *res = true; + return TSDB_CODE_SUCCESS; } else { pIter->hasPrev = true; pIter->prevBlockTsEnd = tsEnd; pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex); pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes); - memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex), pIter->pDataCol->info.bytes); + if (NULL == pIter->pPrevData) { + qError("out of memory when function get input row."); + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex), pIter->pDataCol->info.bytes); pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes); - memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes); + if (NULL == pIter->pPrevPk) { + qError("out of memory when function get input row."); + taosMemoryFree(pIter->pPrevData); + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); - return false; + *res = false; + return TSDB_CODE_SUCCESS; } } } @@ -602,24 +624,27 @@ bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { } } -bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { +int32_t funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, bool *res) { SFuncInputRowIter* pIter = &pCtx->rowIter; if (pCtx->hasPrimaryKey) { if (pCtx->order == TSDB_ORDER_ASC) { - return funcInputGetNextRowAscPk(pIter, pRow); + *res = funcInputGetNextRowAscPk(pIter, pRow); + return TSDB_CODE_SUCCESS; } else { - return funcInputGetNextRowDescPk(pIter, pRow); + return funcInputGetNextRowDescPk(pIter, pRow, res); } } else { - return funcInputGetNextRowNoPk(pIter, pRow); + *res = funcInputGetNextRowNoPk(pIter, pRow); + return TSDB_CODE_SUCCESS; } + return TSDB_CODE_SUCCESS; } // This function append the selectivity to subsidiaries function context directly, without fetching data // from intermediate disk based buf page -void appendSelectivityCols(SqlFunctionCtx* pCtx, SSDataBlock* pSrcBlock, int32_t rowIndex, int32_t pos) { +int32_t appendSelectivityCols(SqlFunctionCtx* pCtx, SSDataBlock* pSrcBlock, int32_t rowIndex, int32_t pos) { if (pCtx->subsidiaries.num <= 0) { - return; + return TSDB_CODE_SUCCESS; } for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { @@ -641,29 +666,34 @@ void appendSelectivityCols(SqlFunctionCtx* pCtx, SSDataBlock* pSrcBlock, int32_t if (colDataIsNull_s(pSrcCol, rowIndex) == true) { colDataSetNULL(pDstCol, pos); } else { - colDataSetVal(pDstCol, pos, pData, false); + int32_t code = colDataSetVal(pDstCol, pos, pData, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } + return TSDB_CODE_SUCCESS; } bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool firstOccur, int32_t* pRowIndex, int32_t* nextFrom); -static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst); +static bool firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst); -bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { +int32_t functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { if (pResultInfo->initialized) { - return false; + return TSDB_CODE_SUCCESS; // already initialized } if (pCtx->pOutput != NULL) { - memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes); + (void)memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes); } initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize); - return true; + return TSDB_CODE_SUCCESS; } int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); @@ -671,9 +701,9 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; char* in = GET_ROWCELL_INTERBUF(pResInfo); - colDataSetVal(pCol, pBlock->info.rows, in, pResInfo->isNullRes); + code = colDataSetVal(pCol, pBlock->info.rows, in, pResInfo->isNullRes); - return pResInfo->numOfRes; + return code; } int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -684,9 +714,7 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, true)) { - pDBuf->hasResult = true; - } + pDBuf->hasResult = firstLastTransferInfoImpl(pSBuf, pDBuf, true); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->isNullRes &= pSResInfo->isNullRes; @@ -701,9 +729,9 @@ int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; char* in = finalResult; - colDataSetVal(pCol, pBlock->info.rows, in, pResInfo->isNullRes); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, in, pResInfo->isNullRes); - return pResInfo->numOfRes; + return code; } EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { @@ -972,9 +1000,12 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin return FUNC_DATA_REQUIRED_SMA_LOAD; } -bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; // not initialized since it has been initialized +int32_t minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; // not initialized since it has been initialized } SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo); @@ -983,7 +1014,7 @@ bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) buf->nullTupleSaved = false; buf->nullTuplePos.pageId = -1; - return true; + return TSDB_CODE_SUCCESS; } bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { @@ -1097,12 +1128,12 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu if ((pCtx->saveHandle.pBuf != NULL && pTuplePos->pageId != -1) || (pCtx->saveHandle.pState && pTuplePos->streamTupleKey.ts > 0)) { int32_t numOfCols = pCtx->subsidiaries.num; - const char* p = loadTupleData(pCtx, pTuplePos); - if (p == NULL) { - terrno = TSDB_CODE_NOT_FOUND; + char* p = NULL; + int32_t code = loadTupleData(pCtx, pTuplePos, &p); + if (p == NULL || TSDB_CODE_SUCCESS != code) { qError("Load tuple data failed since %s, groupId:%" PRIu64 ", ts:%" PRId64, terrstr(), pTuplePos->streamTupleKey.groupId, pTuplePos->streamTupleKey.ts); - return terrno; + return TSDB_CODE_NOT_FOUND; } bool* nullList = (bool*)p; @@ -1123,7 +1154,10 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu if (nullList[j]) { colDataSetNULL(pDstCol, rowIndex); } else { - colDataSetVal(pDstCol, rowIndex, pStart, false); + code = colDataSetVal(pDstCol, rowIndex, pStart, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } pStart += pDstCol->info.bytes; } @@ -1134,11 +1168,12 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu // This function append the selectivity to subsidiaries function context directly, without fetching data // from intermediate disk based buf page -void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) { +int32_t appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) { if (pCtx->subsidiaries.num <= 0) { - return; + return TSDB_CODE_SUCCESS; } + int32_t code = TSDB_CODE_SUCCESS; for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; @@ -1158,9 +1193,13 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) if (colDataIsNull_s(pSrcCol, rowIndex) == true) { colDataSetNULL(pDstCol, pos); } else { - colDataSetVal(pDstCol, pos, pData, false); + code = colDataSetVal(pDstCol, pos, pData, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } + return code; } void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { *pDestPos = *pSourcePos; } @@ -1244,14 +1283,17 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); - memset(pRes, 0, sizeof(SStddevRes)); - return true; + (void)memset(pRes, 0, sizeof(SStddevRes)); + return TSDB_CODE_SUCCESS; } int32_t stddevFunction(SqlFunctionCtx* pCtx) { @@ -1588,16 +1630,19 @@ int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getStddevInfoSize(); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pInfo, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return code; } int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -1620,16 +1665,19 @@ bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); GET_TYPED_DATA(pInfo->startVal, double, pCtx->param[1].param.nType, &pCtx->param[1].param.i); GET_TYPED_DATA(pInfo->stepVal, double, pCtx->param[2].param.nType, &pCtx->param[2].param.i); - return true; + return TSDB_CODE_SUCCESS; } int32_t leastSQRFunction(SqlFunctionCtx* pCtx) { @@ -1802,7 +1850,7 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (0 == pInfo->num) { colDataSetNULL(pCol, currentRow); - return 0; + return TSDB_CODE_SUCCESS; } double(*param)[3] = pInfo->matrix; @@ -1815,7 +1863,7 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (0 == param00) { colDataSetNULL(pCol, currentRow); - return 0; + return TSDB_CODE_SUCCESS; } // param[0][1] = 0; @@ -1830,19 +1878,19 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { char interceptBuf[64] = {0}; int n = snprintf(slopBuf, 64, "%.6lf", param02); if (n > LEASTSQUARES_DOUBLE_ITEM_LENGTH) { - snprintf(slopBuf, 64, "%." DOUBLE_PRECISION_DIGITS, param02); + (void)snprintf(slopBuf, 64, "%." DOUBLE_PRECISION_DIGITS, param02); } n = snprintf(interceptBuf, 64, "%.6lf", param12); if (n > LEASTSQUARES_DOUBLE_ITEM_LENGTH) { - snprintf(interceptBuf, 64, "%." DOUBLE_PRECISION_DIGITS, param12); + (void)snprintf(interceptBuf, 64, "%." DOUBLE_PRECISION_DIGITS, param12); } size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%s, intercept:%s}", slopBuf, interceptBuf); varDataSetLen(buf, len); - colDataSetVal(pCol, currentRow, buf, pResInfo->isNullRes); + int32_t code = colDataSetVal(pCol, currentRow, buf, pResInfo->isNullRes); - return pResInfo->numOfRes; + return code; } int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -1872,9 +1920,12 @@ bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } // in the first round, get the min-max value of all involved data @@ -1883,10 +1934,11 @@ bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultI SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX); pInfo->numOfElems = 0; - return true; + return TSDB_CODE_SUCCESS; } int32_t percentileFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -1905,7 +1957,10 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { pResInfo->complete = true; return TSDB_CODE_SUCCESS; } else { - pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup); + code = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval, pCtx->hasWindowOrGroup, &pInfo->pMemBucket); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } @@ -1966,7 +2021,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pCol, i); numOfElems += 1; - int32_t code = tMemBucketPut(pInfo->pMemBucket, data, 1); + code = tMemBucketPut(pInfo->pMemBucket, data, 1); if (code != TSDB_CODE_SUCCESS) { tMemBucketDestroy(pInfo->pMemBucket); return code; @@ -2014,10 +2069,13 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); varDataSetLen(buf, len); - colDataSetVal(pCol, pBlock->info.rows, buf, false); + code = colDataSetVal(pCol, pBlock->info.rows, buf, false); + if (code != TSDB_CODE_SUCCESS) { + goto _fin_error; + } tMemBucketDestroy(pMemBucket); - return pResInfo->numOfRes; + return TSDB_CODE_SUCCESS; } else { SVariant* pVal = &pCtx->param[1].param; @@ -2076,9 +2134,12 @@ static void buildTDigestInfo(SAPercentileInfo* pInfo) { pInfo->pTDigest = (TDigest*)((char*)pInfo + sizeof(SAPercentileInfo)); } -bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); @@ -2092,7 +2153,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult } else if (pCtx->numOfParams == 3) { pInfo->algo = getApercentileAlgo(varDataVal(pCtx->param[2].param.pz)); if (pInfo->algo == APERCT_ALGO_UNKNOWN) { - return false; + return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; } } @@ -2106,7 +2167,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, pInfo->pHisto->elems); } - return true; + return TSDB_CODE_SUCCESS; } int32_t apercentileFunction(SqlFunctionCtx* pCtx) { @@ -2186,7 +2247,7 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* tdigestAutoFill(pTDigest, COMPRESSION); if (pTDigest->num_centroids <= 0 && pTDigest->num_buffered_pts == 0) { - memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); + (void)memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); tdigestAutoFill(pTDigest, COMPRESSION); } else { tdigestMerge(pTDigest, pInput->pTDigest); @@ -2205,7 +2266,7 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* SHistogramInfo* pHisto = pOutput->pHisto; if (pHisto->numOfElems <= 0) { - memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); + (void)memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo)); qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries, @@ -2216,7 +2277,7 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* pHisto->numOfEntries, pInput->pHisto); SHistogramInfo* pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN); - memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN); + (void)memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN); pHisto->elems = (SHistBin*)((char*)pHisto + sizeof(SHistogramInfo)); qDebug("%s merge histo, total:%" PRId64 ", entry:%d, %p", __FUNCTION__, pHisto->numOfElems, pHisto->numOfEntries, @@ -2300,22 +2361,25 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getApercentileMaxSize(); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } if (pInfo->algo == APERCT_ALGO_TDIGEST) { - memcpy(varDataVal(res), pInfo, resultBytes); + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); } else { - memcpy(varDataVal(res), pInfo, resultBytes); + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); } int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return code; } int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -2447,7 +2511,7 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex); } -static void prepareBuf(SqlFunctionCtx* pCtx) { +static int32_t prepareBuf(SqlFunctionCtx* pCtx) { if (pCtx->subsidiaries.rowLen == 0) { int32_t rowLen = 0; for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { @@ -2457,7 +2521,11 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool); pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen); + if (NULL == pCtx->subsidiaries.buf) { + return TSDB_CODE_OUT_OF_MEMORY; + } } + return TSDB_CODE_SUCCESS; } static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, @@ -2485,12 +2553,12 @@ static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t pInfo->bytes = varDataTLen(pData); } - memcpy(pInfo->buf, pData, pInfo->bytes); + (void)memcpy(pInfo->buf, pData, pInfo->bytes); if (pkData != NULL) { if (IS_VAR_DATA_TYPE(pInfo->pkType)) { pInfo->pkBytes = varDataTLen(pkData); } - memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes); + (void)memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes); pInfo->pkData = pInfo->buf + pInfo->bytes; } @@ -2818,9 +2886,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { +static bool firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst) { if (!pInput->hasResult) { - return TSDB_CODE_FAILED; + return false; } __compar_fn_t pkCompareFn = NULL; if (pInput->pkData) { @@ -2830,12 +2898,12 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p if (isFirst) { if (pInput->ts > pOutput->ts || (pInput->ts == pOutput->ts && pkCompareFn && pkCompareFn(pInput->pkData, pOutput->pkData) > 0)) { - return TSDB_CODE_FAILED; + return false; } } else { if (pInput->ts < pOutput->ts || (pInput->ts == pOutput->ts && pkCompareFn && pkCompareFn(pInput->pkData, pOutput->pkData) > 0)) { - return TSDB_CODE_FAILED; + return false; } } } @@ -2845,25 +2913,24 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p pOutput->bytes = pInput->bytes; pOutput->pkType = pInput->pkType; - memcpy(pOutput->buf, pInput->buf, pOutput->bytes); + (void)memcpy(pOutput->buf, pInput->buf, pOutput->bytes); if (pInput->pkData) { pOutput->pkBytes = pInput->pkBytes; - memcpy(pOutput->buf + pOutput->bytes, pInput->pkData, pOutput->pkBytes); + (void)memcpy(pOutput->buf + pOutput->bytes, pInput->pkData, pOutput->pkBytes); pOutput->pkData = pOutput->buf + pOutput->bytes; } - return TSDB_CODE_SUCCESS; + return true; } static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, int32_t rowIndex) { - if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { + if (firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); - if (code != TSDB_CODE_SUCCESS) { + if (TSDB_CODE_SUCCESS != code) { return code; } pOutput->hasResult = true; } - return TSDB_CODE_SUCCESS; } @@ -2923,7 +2990,10 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); - colDataSetVal(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); + code = colDataSetVal(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); + if (TSDB_CODE_SUCCESS != code) { + return code; + } // handle selectivity code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); @@ -2941,14 +3011,20 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { // todo check for failure char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pRes, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pRes, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + code = colDataSetVal(pCol, pBlock->info.rows, res, false); + if (TSDB_CODE_SUCCESS != code) { + return TSDB_CODE_OUT_OF_MEMORY; + } code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); taosMemoryFree(res); @@ -2963,10 +3039,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); - if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pSBuf, pDBuf, false)) { - pDBuf->hasResult = true; - } - + pDBuf->hasResult = firstLastTransferInfoImpl(pSBuf, pDBuf, false); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->isNullRes &= pSResInfo->isNullRes; return TSDB_CODE_SUCCESS; @@ -2987,7 +3060,7 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex pInfo->bytes = varDataTLen(pData); } - memcpy(pInfo->buf, pData, pInfo->bytes); + (void)memcpy(pInfo->buf, pData, pInfo->bytes); } if (pCtx->hasPrimaryKey) { @@ -2995,7 +3068,7 @@ static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex if (IS_VAR_DATA_TYPE(pInfo->pkType)) { pInfo->pkBytes = varDataTLen(pkData); } - memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes); + (void)memcpy(pInfo->buf + pInfo->bytes, pkData, pInfo->pkBytes); pInfo->pkData = pInfo->buf + pInfo->bytes; } pInfo->ts = cts; @@ -3101,11 +3174,13 @@ bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { - if (!functionSetup(pCtx, pResInfo)) { - return false; +int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (pResInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } - SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->hasPrev = false; pDiffInfo->isFirstRow = true; @@ -3116,7 +3191,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { } else { pDiffInfo->ignoreOption = 0; } - return true; + return TSDB_CODE_SUCCESS; } static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, int64_t ts) { @@ -3155,7 +3230,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, return TSDB_CODE_SUCCESS; } -static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { +static bool diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { switch (type) { case TSDB_DATA_TYPE_UINT: { int64_t v = *(uint32_t*)pv; @@ -3382,14 +3457,17 @@ int32_t setDoDiffResult(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) SColumnInfoData* pInputCol = pInput->pData[0]; int8_t inputType = pInputCol->info.type; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - + int32_t code = TSDB_CODE_SUCCESS; if (pRow->isDataNull) { colDataSetNull_f_s(pOutput, pos); pOutput->hasNull = true; // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); + code = appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } return TSDB_CODE_SUCCESS; } @@ -3399,13 +3477,16 @@ int32_t setDoDiffResult(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) if (pRow->ts == pDiffInfo->prevTs) { return TSDB_CODE_FUNC_DUP_TIMESTAMP; } - int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, pRow->ts); + code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, pRow->ts); if (code != TSDB_CODE_SUCCESS) { return code; } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); + code = appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } return TSDB_CODE_SUCCESS; @@ -3424,6 +3505,9 @@ int32_t diffFunctionByRow(SArray* pCtxArray) { int32_t numOfElems = 0; SArray* pRows = taosArrayInit_s(sizeof(SFuncInputRow), diffColNum); + if (NULL == pRows) { + return TSDB_CODE_OUT_OF_MEMORY; + } bool keepNull = false; for (int i = 0; i < diffColNum; ++i) { @@ -3439,12 +3523,24 @@ int32_t diffFunctionByRow(SArray* pCtxArray) { SqlFunctionCtx* pCtx0 = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, 0); SFuncInputRow* pRow0 = (SFuncInputRow*)taosArrayGet(pRows, 0); int32_t startOffset = pCtx0->offset; - while (funcInputGetNextRow(pCtx0, pRow0)) { + bool result = false; + while (1) { + code = funcInputGetNextRow(pCtx0, pRow0, &result); + if (TSDB_CODE_SUCCESS != code) { + goto _exit; + } + if (!result) { + break; + } bool hasNotNullValue = !diffResultIsNull(pCtx0, pRow0); for (int i = 1; i < diffColNum; ++i) { SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i); SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i); - if(!funcInputGetNextRow(pCtx, pRow)) { + code = funcInputGetNextRow(pCtx, pRow, &result); + if (TSDB_CODE_SUCCESS != code) { + goto _exit; + } + if (!result) { // rows are not equal code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; goto _exit; @@ -3497,9 +3593,12 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { - if (!functionSetup(pCtx, pResInfo)) { - return false; +int32_t topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (pResInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); @@ -3509,7 +3608,7 @@ bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pRes->nullTupleSaved = false; pRes->nullTuplePos.pageId = -1; - return true; + return TSDB_CODE_SUCCESS; } static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { @@ -3523,7 +3622,7 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); -static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); +static int32_t addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; @@ -3630,8 +3729,10 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); + int32_t code = TSDB_CODE_SUCCESS; SVariant val = {0}; + // TODO(smj) : this func need err code taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); STopBotResItem* pItems = pRes->pItems; @@ -3644,7 +3745,7 @@ int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSD // save the data of this tuple if (pCtx->subsidiaries.num > 0) { - int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3655,8 +3756,11 @@ int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSD #endif // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; - taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, - !isTopQuery); + code = taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, + topBotResComparFn, !isTopQuery); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { // replace the minimum value in the result if ((isTopQuery && ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || @@ -3673,7 +3777,7 @@ int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSD // save the data of this tuple by over writing the old data if (pCtx->subsidiaries.num > 0) { - int32_t code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3681,8 +3785,11 @@ int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSD #ifdef BUF_PAGE_DEBUG qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); #endif - taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, + code = taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, topBotResComparFn, NULL, !isTopQuery); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } @@ -3721,9 +3828,9 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid char* p = colDataGetData(pCol, rowIndex); if (IS_VAR_DATA_TYPE(pCol->info.type)) { - memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p) : varDataTLen(p)); + (void)memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p) : varDataTLen(p)); } else { - memcpy(pStart + offset, p, pCol->info.bytes); + (void)memcpy(pStart + offset, p, pCol->info.bytes); } offset += pCol->info.bytes; @@ -3761,7 +3868,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, } p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num}; - memcpy(pPage->data + pPage->num, pBuf, length); + (void)memcpy(pPage->data + pPage->num, pBuf, length); pPage->num += length; setBufPageDirty(pPage, true); @@ -3777,7 +3884,10 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, } int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { - prepareBuf(pCtx); + int32_t code = prepareBuf(pCtx); + if (TSDB_CODE_SUCCESS != code) { + return code; + } SWinKey key = {0}; if (pCtx->saveHandle.pBuf == NULL) { @@ -3797,42 +3907,52 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pPage == NULL) { return terrno; } - memcpy(pPage->data + pPos->offset, pBuf, length); + (void)memcpy(pPage->data + pPos->offset, pBuf, length); setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); } else { - pStore->streamStateFuncPut(pHandle->pState, &pPos->streamTupleKey, pBuf, length); + int32_t code = pStore->streamStateFuncPut(pHandle->pState, &pPos->streamTupleKey, pBuf, length); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } return TSDB_CODE_SUCCESS; } int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { - prepareBuf(pCtx); + int32_t code = prepareBuf(pCtx); + if (TSDB_CODE_SUCCESS != code) { + return code; + } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos, pCtx->pStore); } -static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos, SFunctionStateStore* pStore) { +static int32_t doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos, SFunctionStateStore* pStore, char** value) { if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); if (pPage == NULL) { - return NULL; + *value = NULL; + return terrno; } - char* p = pPage->data + pPos->offset; + *value = pPage->data + pPos->offset; releaseBufPage(pHandle->pBuf, pPage); - return p; + return TSDB_CODE_SUCCESS; } else { - void* value = NULL; + *value = NULL; int32_t vLen; - pStore->streamStateFuncGet(pHandle->pState, &pPos->streamTupleKey, &value, &vLen); - return (char*)value; + int32_t code = pStore->streamStateFuncGet(pHandle->pState, &pPos->streamTupleKey, (void **)(value), &vLen); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + return TSDB_CODE_SUCCESS; } } -const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) { - return doLoadTupleData(&pCtx->saveHandle, pPos, pCtx->pStore); +int32_t loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos, char** value) { + return doLoadTupleData(&pCtx->saveHandle, pPos, pCtx->pStore, value); } int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { @@ -3855,22 +3975,29 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { STopBotResItem* pItem = &pRes->pItems[i]; - colDataSetVal(pCol, currentRow, (const char*)&pItem->v.i, false); + code = colDataSetVal(pCol, currentRow, (const char*)&pItem->v.i, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } #ifdef BUF_PAGE_DEBUG qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset); #endif code = setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + if (TSDB_CODE_SUCCESS != code) { + return code; + } currentRow += 1; } return code; } -void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { +int32_t addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotResItem* pItems = pRes->pItems; + int32_t code = TSDB_CODE_SUCCESS; // not full yet if (pEntryInfo->numOfRes < pRes->maxSize) { @@ -3880,8 +4007,11 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, pItem->tuplePos.pageId = -1; replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); pEntryInfo->numOfRes++; - taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, - !isTopQuery); + code = taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, + topBotResComparFn, !isTopQuery); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } else { // replace the minimum value in the result if ((isTopQuery && ((IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i > pItems[0].v.i) || (IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u > pItems[0].v.u) || @@ -3898,18 +4028,26 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, // save the data of this tuple by over writing the old data replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); - taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, - topBotResComparFn, NULL, !isTopQuery); + code = taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, + topBotResComparFn, NULL, !isTopQuery); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } + return code; } int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx); int16_t type = pSBuf->type; + int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pSResInfo->numOfRes; i++) { - addResult(pDestCtx, pSBuf->pItems + i, type, true); + code = addResult(pDestCtx, pSBuf->pItems + i, type, true); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } return TSDB_CODE_SUCCESS; } @@ -3918,8 +4056,12 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx); int16_t type = pSBuf->type; + int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pSResInfo->numOfRes; i++) { - addResult(pDestCtx, pSBuf->pItems + i, type, false); + code = addResult(pDestCtx, pSBuf->pItems + i, type, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } return TSDB_CODE_SUCCESS; } @@ -3931,16 +4073,19 @@ bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -bool spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); SET_DOUBLE_VAL(&pInfo->min, DBL_MAX); SET_DOUBLE_VAL(&pInfo->max, -DBL_MAX); pInfo->hasResult = false; - return true; + return TSDB_CODE_SUCCESS; } int32_t spreadFunction(SqlFunctionCtx* pCtx) { @@ -4073,16 +4218,22 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getSpreadInfoSize(); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pInfo, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } taosMemoryFree(res); - return pResInfo->numOfRes; + return TSDB_CODE_SUCCESS; } int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -4104,9 +4255,12 @@ bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); @@ -4120,7 +4274,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo pInfo->timeUnit = 1; } - return true; + return TSDB_CODE_SUCCESS; } int32_t elapsedFunction(SqlFunctionCtx* pCtx) { @@ -4248,16 +4402,21 @@ int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getElapsedInfoSize(); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pInfo, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); - + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } taosMemoryFree(res); - return pResInfo->numOfRes; + return code; } int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -4297,7 +4456,7 @@ static int8_t getHistogramBinType(char* binTypeStr) { return binType; } -static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t binType, bool normalized) { +static int32_t getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t binType, bool normalized) { cJSON* binDesc = cJSON_Parse(binDescStr); int32_t numOfBins; double* intervals; @@ -4306,7 +4465,7 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t int32_t startIndex; if (numOfParams != 4) { cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } cJSON* start = cJSON_GetObjectItem(binDesc, "start"); @@ -4317,18 +4476,18 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) { cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000 cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) || (factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) { cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } int32_t counter = (int32_t)count->valueint; @@ -4341,19 +4500,24 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t } intervals = taosMemoryCalloc(numOfBins, sizeof(double)); + if (NULL == intervals) { + cJSON_Delete(binDesc); + qError("histogram function out of memory"); + return TSDB_CODE_OUT_OF_MEMORY; + } if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) { // linear bin process if (width->valuedouble == 0) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } for (int i = 0; i < counter + 1; ++i) { intervals[startIndex] = start->valuedouble + i * width->valuedouble; if (isinf(intervals[startIndex])) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } startIndex++; } @@ -4362,26 +4526,26 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t if (start->valuedouble == 0) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } for (int i = 0; i < counter + 1; ++i) { intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0); if (isinf(intervals[startIndex])) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } startIndex++; } } else { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } if (infinity->valueint == true) { @@ -4389,7 +4553,7 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t intervals[numOfBins - 1] = INFINITY; // in case of desc bin orders, -inf/inf should be swapped if (numOfBins < 4) { - return false; + return TSDB_CODE_FAILED; } if (intervals[1] > intervals[numOfBins - 2]) { TSWAP(intervals[0], intervals[numOfBins - 1]); @@ -4398,15 +4562,20 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t } else if (cJSON_IsArray(binDesc)) { /* user input bins */ if (binType != USER_INPUT_BIN) { cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } numOfBins = cJSON_GetArraySize(binDesc); intervals = taosMemoryCalloc(numOfBins, sizeof(double)); + if (NULL == intervals) { + cJSON_Delete(binDesc); + qError("histogram function out of memory"); + return TSDB_CODE_OUT_OF_MEMORY; + } cJSON* bin = binDesc->child; if (bin == NULL) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } int i = 0; while (bin) { @@ -4414,19 +4583,19 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t if (!cJSON_IsNumber(bin)) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } if (i != 0 && intervals[i] <= intervals[i - 1]) { taosMemoryFree(intervals); cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } bin = bin->next; i++; } } else { cJSON_Delete(binDesc); - return false; + return TSDB_CODE_FAILED; } pInfo->numOfBins = numOfBins - 1; @@ -4440,12 +4609,15 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t taosMemoryFree(intervals); cJSON_Delete(binDesc); - return true; + return TSDB_CODE_SUCCESS; } -bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); @@ -4458,21 +4630,22 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn taosMemoryFree(binTypeStr); if (binType == UNKNOWN_BIN) { - return false; + return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; } char* binDesc = strndup(varDataVal(pCtx->param[2].param.pz), varDataLen(pCtx->param[2].param.pz)); int64_t normalized = pCtx->param[3].param.i; if (normalized != 0 && normalized != 1) { taosMemoryFree(binDesc); - return false; + return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; } - if (!getHistogramBinDesc(pInfo, binDesc, binType, (bool)normalized)) { + int32_t code = getHistogramBinDesc(pInfo, binDesc, binType, (bool)normalized); + if (TSDB_CODE_SUCCESS != code) { taosMemoryFree(binDesc); - return false; + return code; } taosMemoryFree(binDesc); - return true; + return TSDB_CODE_SUCCESS; } static int32_t histogramFunctionImpl(SqlFunctionCtx* pCtx, bool isPartial) { @@ -4556,6 +4729,7 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + int32_t code = TSDB_CODE_SUCCESS; int32_t currentRow = pBlock->info.rows; @@ -4580,11 +4754,14 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pInfo->bins[i].upper, pInfo->bins[i].percentage); } varDataSetLen(buf, len); - colDataSetVal(pCol, currentRow, buf, false); + code = colDataSetVal(pCol, currentRow, buf, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } currentRow++; } - return pResInfo->numOfRes; + return code; } int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { @@ -4593,16 +4770,19 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getHistogramInfoSize(); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pInfo, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return code; } int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -4818,16 +4998,19 @@ int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getHLLInfoSize(); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pInfo, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return code; } int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -4930,6 +5113,7 @@ static bool checkStateOp(int8_t op, SColumnInfoData* pCol, int32_t index, SVaria } int32_t stateCountFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -4960,7 +5144,10 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) { colDataSetNULL(pOutput, i); // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + code = appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } continue; } @@ -4973,11 +5160,17 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) { } else { pInfo->count = 0; } - colDataSetVal(pOutput, pCtx->offset + numOfElems - 1, (char*)&output, false); + code = colDataSetVal(pOutput, pCtx->offset + numOfElems - 1, (char*)&output, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + code = appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } @@ -4986,6 +5179,7 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) { } int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -5022,7 +5216,10 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { colDataSetNULL(pOutput, i); // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + code = appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } continue; } @@ -5039,11 +5236,17 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { } else { pInfo->durationStart = 0; } - colDataSetVal(pOutput, pCtx->offset + numOfElems - 1, (char*)&output, false); + code = colDataSetVal(pOutput, pCtx->offset + numOfElems - 1, (char*)&output, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + code = appendSelectivityValue(pCtx, i, pCtx->offset + numOfElems - 1); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } @@ -5057,6 +5260,7 @@ bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } int32_t csumFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo); @@ -5088,12 +5292,18 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) { int64_t v; GET_TYPED_DATA(v, int64_t, type, data); pSumRes->isum += v; - colDataSetVal(pOutput, pos, (char*)&pSumRes->isum, false); + code = colDataSetVal(pOutput, pos, (char*)&pSumRes->isum, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { uint64_t v; GET_TYPED_DATA(v, uint64_t, type, data); pSumRes->usum += v; - colDataSetVal(pOutput, pos, (char*)&pSumRes->usum, false); + code = colDataSetVal(pOutput, pos, (char*)&pSumRes->usum, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } else if (IS_FLOAT_TYPE(type)) { double v; GET_TYPED_DATA(v, double, type, data); @@ -5102,13 +5312,19 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) { if (isinf(pSumRes->dsum) || isnan(pSumRes->dsum)) { colDataSetNULL(pOutput, pos); } else { - colDataSetVal(pOutput, pos, (char*)&pSumRes->dsum, false); + code = colDataSetVal(pOutput, pos, (char*)&pSumRes->dsum, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); + code = appendSelectivityValue(pCtx, i, pos); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } numOfElems++; @@ -5123,9 +5339,12 @@ bool getMavgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -bool mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SMavgInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); @@ -5135,14 +5354,15 @@ bool mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { pInfo->isPrevTsSet = false; pInfo->numOfPoints = pCtx->param[1].param.i; if (pInfo->numOfPoints < 1 || pInfo->numOfPoints > MAVG_MAX_POINTS_NUM) { - return false; + return TSDB_CODE_FUNC_FUNTION_PARA_VALUE; } pInfo->pointsMeet = false; - return true; + return TSDB_CODE_SUCCESS; } int32_t mavgFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SMavgInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -5191,12 +5411,18 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) { if (isinf(result) || isnan(result)) { colDataSetNULL(pOutput, pos); } else { - colDataSetVal(pOutput, pos, (char*)&result, false); + code = colDataSetVal(pOutput, pos, (char*)&result, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); + code = appendSelectivityValue(pCtx, i, pos); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } numOfElems++; @@ -5230,9 +5456,12 @@ bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } taosSeedRand(taosSafeRand()); @@ -5248,7 +5477,7 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) pInfo->data = (char*)pInfo + sizeof(SSampleInfo); pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes); - return true; + return TSDB_CODE_SUCCESS; } static void sampleAssignResult(SSampleInfo* pInfo, char* data, int32_t index) { @@ -5330,8 +5559,14 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return code; } for (int32_t i = 0; i < pInfo->numSampled; ++i) { - colDataSetVal(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); + code = colDataSetVal(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } code = setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } return code; @@ -5347,7 +5582,7 @@ bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { +int32_t tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { #if 0 if (!functionSetup(pCtx, pResultInfo)) { return false; @@ -5378,7 +5613,7 @@ bool tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { } #endif - return true; + return TSDB_CODE_SUCCESS; } static void tailAssignResult(STailItem* pItem, char* data, int32_t colBytes, TSKEY ts, bool isNull) { @@ -5483,7 +5718,7 @@ bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { +int32_t uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { #if 0 if (!functionSetup(pCtx, pResInfo)) { return false; @@ -5499,7 +5734,7 @@ bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } #endif - return true; + return TSDB_CODE_SUCCESS; } #if 0 @@ -5580,9 +5815,12 @@ bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { - if (!functionSetup(pCtx, pResInfo)) { - return false; +int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (pResInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -5592,13 +5830,19 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { taosHashClear(pInfo->pHash); } else { pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == pInfo->pHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } } pInfo->nullTupleSaved = false; pInfo->nullTuplePos.pageId = -1; pInfo->buf = taosMemoryMalloc(pInfo->colBytes); + if (NULL == pInfo->buf) { + return TSDB_CODE_OUT_OF_MEMORY; + } - return true; + return TSDB_CODE_SUCCESS; } static void modeFunctionCleanup(SModeInfo * pInfo) { @@ -5608,9 +5852,9 @@ static void modeFunctionCleanup(SModeInfo * pInfo) { static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) { if (IS_VAR_DATA_TYPE(pInfo->colType)) { - memcpy(pInfo->buf, data, varDataTLen(data)); + (void)memcpy(pInfo->buf, data, varDataTLen(data)); } else { - memcpy(pInfo->buf, data, pInfo->colBytes); + (void)memcpy(pInfo->buf, data, pInfo->colBytes); } return doSaveTupleData(&pCtx->saveHandle, pInfo->buf, pInfo->colBytes, NULL, pPos, pCtx->pStore); @@ -5638,7 +5882,10 @@ static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCt } } - taosHashPut(pInfo->pHash, data, hashKeyBytes, &item, sizeof(SModeItem)); + code = taosHashPut(pInfo->pHash, data, hashKeyBytes, &item, sizeof(SModeItem)); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { pHashItem->count += 1; if (pCtx->subsidiaries.num > 0) { @@ -5715,8 +5962,9 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } if (maxCount != 0) { - const char* pData = loadTupleData(pCtx, &resDataPos); - if (pData == NULL) { + char* pData = NULL; + code = loadTupleData(pCtx, &resDataPos, &pData); + if (pData == NULL || TSDB_CODE_SUCCESS != code) { code = terrno = TSDB_CODE_NOT_FOUND; qError("Load tuple data failed since %s, groupId:%" PRIu64 ", ts:%" PRId64, terrstr(), resDataPos.streamTupleKey.groupId, resDataPos.streamTupleKey.ts); @@ -5724,7 +5972,10 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return code; } - colDataSetVal(pCol, currentRow, pData, false); + code = colDataSetVal(pCol, currentRow, pData, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } code = setSelectivityValue(pCtx, pBlock, &resTuplePos, currentRow); } else { colDataSetNULL(pCol, currentRow); @@ -5741,16 +5992,19 @@ bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); pInfo->numOfElems = 0; pInfo->p.key = INT64_MIN; pInfo->win = TSWINDOW_INITIALIZER; - return true; + return TSDB_CODE_SUCCESS; } static double twa_get_area(SPoint1 s, SPoint1 e) { @@ -5768,6 +6022,7 @@ static double twa_get_area(SPoint1 s, SPoint1 e) { } int32_t twaFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -5782,8 +6037,16 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { funcInputUpdate(pCtx); SFuncInputRow row = {0}; + bool result = false; if (pCtx->start.key != INT64_MIN && last->key == INT64_MIN) { - while (funcInputGetNextRow(pCtx, &row)) { + while (1) { + code = funcInputGetNextRow(pCtx, &row, &result); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (!result) { + break; + } if (row.isDataNull) { continue; } @@ -5798,7 +6061,14 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { break; } } else if (pInfo->p.key == INT64_MIN) { - while (funcInputGetNextRow(pCtx, &row)) { + while (1) { + code = funcInputGetNextRow(pCtx, &row, &result); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (!result) { + break; + } if (row.isDataNull) { continue; } @@ -5816,181 +6086,69 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { SPoint1 st = {0}; // calculate the value of - switch (pInputCol->info.type) { - case TSDB_DATA_TYPE_TINYINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + while (1) { + code = funcInputGetNextRow(pCtx, &row, &result); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (!result) { + break; + } + if (row.isDataNull) { + continue; + } + pInfo->numOfElems++; + switch (pInputCol->info.type) { + case TSDB_DATA_TYPE_TINYINT: { INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - - case TSDB_DATA_TYPE_SMALLINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_SMALLINT: { INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_INT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_INT: { INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_BIGINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_BIGINT: { INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_FLOAT: { INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_DOUBLE: { INIT_INTP_POINT(st, row.ts, *(double*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_UTINYINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_UTINYINT: { INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_USMALLINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_USMALLINT: { INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_UINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_UINT: { INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; - } - case TSDB_DATA_TYPE_UBIGINT: { - while (funcInputGetNextRow(pCtx, &row)) { - if (row.isDataNull) { - continue; - } - pInfo->numOfElems++; - + case TSDB_DATA_TYPE_UBIGINT: { INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData); - if (pInfo->p.key == st.key) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - - pInfo->dOutput += twa_get_area(pInfo->p, st); - pInfo->p = st; + break; } - break; + default: { + return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; + } + } + if (pInfo->p.key == st.key) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; } - default: - return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; + pInfo->dOutput += twa_get_area(pInfo->p, st); + pInfo->p = st; } // the last interpolated time window value @@ -6040,14 +6198,17 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } -bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); pInfo->minRows = INT32_MAX; - return true; + return TSDB_CODE_SUCCESS; } int32_t blockDistFunction(SqlFunctionCtx* pCtx) { @@ -6059,7 +6220,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo p1 = {0}; - tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1); + (void)tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1); pDistInfo->numOfBlocks += p1.numOfBlocks; pDistInfo->numOfTables += p1.numOfTables; @@ -6176,7 +6337,10 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%'); varDataSetLen(st, len); - colDataSetVal(pColInfo, row++, st, false); + int32_t code = colDataSetVal(pColInfo, row++, st, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } int64_t avgRows = 0; if (pData->numOfBlocks > 0) { @@ -6186,22 +6350,34 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { len = sprintf(st + VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]", pData->totalRows, pData->minRows, pData->maxRows, avgRows); varDataSetLen(st, len); - colDataSetVal(pColInfo, row++, st, false); + code = colDataSetVal(pColInfo, row++, st, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } len = sprintf(st + VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows); varDataSetLen(st, len); - colDataSetVal(pColInfo, row++, st, false); + code = colDataSetVal(pColInfo, row++, st, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables, pData->numOfFiles, pData->numOfVgroups); varDataSetLen(st, len); - colDataSetVal(pColInfo, row++, st, false); + code = colDataSetVal(pColInfo, row++, st, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } len = sprintf(st + VARSTR_HEADER_SIZE, "--------------------------------------------------------------------------------"); varDataSetLen(st, len); - colDataSetVal(pColInfo, row++, st, false); + code = colDataSetVal(pColInfo, row++, st, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } int32_t maxVal = 0; int32_t minVal = INT32_MAX; @@ -6240,7 +6416,10 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } varDataSetLen(st, len); - colDataSetVal(pColInfo, row++, st, false); + code = colDataSetVal(pColInfo, row++, st, false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } return TSDB_CODE_SUCCESS; @@ -6251,9 +6430,12 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { - if (!functionSetup(pCtx, pResInfo)) { - return false; // not initialized since it has been initialized +int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (pResInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -6262,7 +6444,7 @@ bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pDerivInfo->prevTs = -1; pDerivInfo->tsWindow = pCtx->param[1].param.i; pDerivInfo->valueSet = false; - return true; + return TSDB_CODE_SUCCESS; } int32_t derivativeFunction(SqlFunctionCtx* pCtx) { @@ -6275,13 +6457,22 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pTsOutput = pCtx->pTsOutput; + int32_t code = TSDB_CODE_SUCCESS; funcInputUpdate(pCtx); double v = 0; if (pCtx->order == TSDB_ORDER_ASC) { SFuncInputRow row = {0}; - while (funcInputGetNextRow(pCtx, &row)) { + bool result = false; + while (1) { + code = funcInputGetNextRow(pCtx, &row, &result); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (!result) { + break; + } if (row.isDataNull) { continue; } @@ -6302,7 +6493,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { if (isinf(r) || isnan(r)) { colDataSetNULL(pOutput, pos); } else { - colDataSetVal(pOutput, pos, (const char*)&r, false); + code = colDataSetVal(pOutput, pos, (const char*)&r, false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pTsOutput != NULL) { @@ -6311,7 +6505,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); + code = appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } numOfElems++; @@ -6323,7 +6520,15 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { } } else { SFuncInputRow row = {0}; - while (funcInputGetNextRow(pCtx, &row)) { + bool result = false; + while (1) { + code = funcInputGetNextRow(pCtx, &row, &result); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (!result) { + break; + } if (row.isDataNull) { continue; } @@ -6344,7 +6549,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { if (isinf(r) || isnan(r)) { colDataSetNULL(pOutput, pos); } else { - colDataSetVal(pOutput, pos, (const char*)&r, false); + code = colDataSetVal(pOutput, pos, (const char*)&r, false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pTsOutput != NULL) { @@ -6353,7 +6561,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { // handle selectivity if (pCtx->subsidiaries.num > 0) { - appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); + code = appendSelectivityCols(pCtx, row.block, row.rowIndex, pos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } numOfElems++; } @@ -6377,9 +6588,12 @@ bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { - if (!functionSetup(pCtx, pResInfo)) { - return false; // not initialized since it has been initialized +int32_t irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (pResInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -6390,7 +6604,7 @@ bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->lastValue = (double)INT64_MIN; pInfo->hasResult = 0; - return true; + return TSDB_CODE_SUCCESS; } static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char* pk, double v) { @@ -6399,14 +6613,14 @@ static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char* pRateInfo->firstKey = ts; if (pRateInfo->firstPk) { int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes; - memcpy(pRateInfo->firstPk, pk, pkBytes); + (void)memcpy(pRateInfo->firstPk, pk, pkBytes); } } else { pRateInfo->lastValue = v; pRateInfo->lastKey = ts; if (pRateInfo->lastPk) { int32_t pkBytes = IS_VAR_DATA_TYPE(pRateInfo->pkType) ? varDataTLen(pk) : pRateInfo->pkBytes; - memcpy(pRateInfo->lastPk, pk, pkBytes); + (void)memcpy(pRateInfo->lastPk, pk, pkBytes); } } } @@ -6429,6 +6643,7 @@ static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo, bool } int32_t irateFunction(SqlFunctionCtx* pCtx) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -6444,7 +6659,15 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; int32_t type = pInputCol->info.type; SFuncInputRow row = {0}; - while (funcInputGetNextRow(pCtx, &row)) { + bool result = false; + while (1) { + code = funcInputGetNextRow(pCtx, &row, &result); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (!result) { + break; + } if (row.isDataNull) { continue; } @@ -6589,16 +6812,19 @@ int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t resultBytes = getIrateInfoSize(pInfo->pkBytes); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - memcpy(varDataVal(res), pInfo, resultBytes); + if (NULL == res) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (void)memcpy(varDataVal(res), pInfo, resultBytes); varDataSetLen(res, resultBytes); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - colDataSetVal(pCol, pBlock->info.rows, res, false); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return code; } int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { @@ -6610,9 +6836,9 @@ int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); double result = doCalcRate(pInfo, (double)TSDB_TICK_PER_SECOND(pCtx->param[1].param.i)); - colDataSetVal(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes); + int32_t code = colDataSetVal(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes); - return pResInfo->numOfRes; + return code; } int32_t groupConstValueFunction(SqlFunctionCtx* pCtx) { @@ -6637,10 +6863,10 @@ int32_t groupConstValueFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, startIndex); if (IS_VAR_DATA_TYPE(pInputCol->info.type)) { - memcpy(pInfo->data, data, - (pInputCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(data) : varDataTLen(data)); + (void)memcpy(pInfo->data, data, + (pInputCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(data) : varDataTLen(data)); } else { - memcpy(pInfo->data, data, pInputCol->info.bytes); + (void)memcpy(pInfo->data, data, pInputCol->info.bytes); } pInfo->hasResult = true; @@ -6656,6 +6882,7 @@ int32_t groupKeyFunction(SqlFunctionCtx* pCtx) { int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + int32_t code = TSDB_CODE_SUCCESS; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -6665,13 +6892,16 @@ int32_t groupConstValueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (pInfo->hasResult) { int32_t currentRow = pBlock->info.rows; for (; currentRow < pBlock->info.rows + pResInfo->numOfRes; ++currentRow) { - colDataSetVal(pCol, currentRow, pInfo->data, pInfo->isNull ? true : false); + code = colDataSetVal(pCol, currentRow, pInfo->data, pInfo->isNull ? true : false); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } } else { pResInfo->numOfRes = 0; } - return pResInfo->numOfRes; + return code; } int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock){ @@ -6697,10 +6927,10 @@ int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { } if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) { - memcpy(pDBuf->data, pSBuf->data, + (void)memcpy(pDBuf->data, pSBuf->data, (pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data)); } else { - memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes); + (void)memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes); } pDBuf->hasResult = true; diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index 3d51f0cd16..3b8cf6e6b5 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -354,14 +354,17 @@ bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { - if (!functionSetup(pCtx, pResultInfo)) { - return false; +int32_t avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) { + return TSDB_CODE_FUNC_SETUP_ERROR; } SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); - memset(pRes, 0, sizeof(SAvgRes)); - return true; + (void)memset(pRes, 0, sizeof(SAvgRes)); + return TSDB_CODE_SUCCESS; } static int32_t calculateAvgBySMAInfo(SAvgRes* pRes, int32_t numOfRows, int32_t type, const SColumnDataAgg* pAgg) { @@ -859,5 +862,5 @@ int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { colDataSetVal(pCol, pBlock->info.rows, res, false); taosMemoryFree(res); - return pResInfo->numOfRes; + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/thistogram.c b/source/libs/function/src/thistogram.c index b56691f35d..3f4ce3811b 100644 --- a/source/libs/function/src/thistogram.c +++ b/source/libs/function/src/thistogram.c @@ -50,7 +50,7 @@ SHistogramInfo* tHistogramCreate(int32_t numOfEntries) { } SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins) { - memset(pBuf, 0, sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfBins + 1)); + (void)memset(pBuf, 0, sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfBins + 1)); SHistogramInfo* pHisto = (SHistogramInfo*)pBuf; pHisto->elems = (SHistBin*)((char*)pBuf + sizeof(SHistogramInfo)); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index a068186992..d7b4eb21c0 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -238,67 +238,71 @@ static void resetSlotInfo(tMemBucket *pBucket) { } } -tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup) { - tMemBucket *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket)); - if (pBucket == NULL) { - return NULL; +int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup, + tMemBucket **pBucket) { + *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket)); + if (*pBucket == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } if (hasWindowOrGroup) { // With window or group by, we need to shrink page size and reduce page num to save memory. - pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket - pBucket->bufPageSize = 4096; // 4k per page + (*pBucket)->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket + (*pBucket)->bufPageSize = 4096; // 4k per page } else { - pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; - pBucket->bufPageSize = 16384 * 4; // 16k per page + (*pBucket)->numOfSlots = DEFAULT_NUM_OF_SLOT; + (*pBucket)->bufPageSize = 16384 * 4; // 16k per page } - pBucket->type = dataType; - pBucket->bytes = nElemSize; - pBucket->total = 0; - pBucket->times = 1; + (*pBucket)->type = dataType; + (*pBucket)->bytes = nElemSize; + (*pBucket)->total = 0; + (*pBucket)->times = 1; - pBucket->maxCapacity = 200000; - pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { + (*pBucket)->maxCapacity = 200000; + (*pBucket)->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if ((*pBucket)->groupPagesMap == NULL) { + tMemBucketDestroy(*pBucket); + return TSDB_CODE_OUT_OF_MEMORY; + } + if (setBoundingBox(&(*pBucket)->range, (*pBucket)->type, minval, maxval) != 0) { // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); - taosMemoryFree(pBucket); - return NULL; + tMemBucketDestroy(*pBucket); + return TSDB_CODE_FUNC_INVALID_VALUE_RANGE; } - pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(SFilePage)) / pBucket->bytes; - pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC); + (*pBucket)->elemPerPage = ((*pBucket)->bufPageSize - sizeof(SFilePage)) / (*pBucket)->bytes; + (*pBucket)->comparFn = getKeyComparFunc((*pBucket)->type, TSDB_ORDER_ASC); - pBucket->hashFunc = getHashFunc(pBucket->type); - if (pBucket->hashFunc == NULL) { + (*pBucket)->hashFunc = getHashFunc((*pBucket)->type); + if ((*pBucket)->hashFunc == NULL) { // qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); - taosMemoryFree(pBucket); - return NULL; + tMemBucketDestroy(*pBucket); + return TSDB_CODE_FUNC_FUNTION_PARA_TYPE; } - pBucket->pSlots = (tMemBucketSlot *)taosMemoryCalloc(pBucket->numOfSlots, sizeof(tMemBucketSlot)); - if (pBucket->pSlots == NULL) { - taosMemoryFree(pBucket); - return NULL; + (*pBucket)->pSlots = (tMemBucketSlot *)taosMemoryCalloc((*pBucket)->numOfSlots, sizeof(tMemBucketSlot)); + if ((*pBucket)->pSlots == NULL) { + tMemBucketDestroy(*pBucket); + return TSDB_CODE_OUT_OF_MEMORY; } - resetSlotInfo(pBucket); + resetSlotInfo((*pBucket)); if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; // qError("MemBucket create disk based Buf failed since %s", terrstr(terrno)); - tMemBucketDestroy(pBucket); - return NULL; + tMemBucketDestroy(*pBucket); + return TSDB_CODE_NO_DISKSPACE; } - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir); + int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * 1024, "1", tsTempDir); if (ret != 0) { - tMemBucketDestroy(pBucket); - return NULL; + tMemBucketDestroy(*pBucket); + return ret; } // qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); - return pBucket; + return TSDB_CODE_SUCCESS; } void tMemBucketDestroy(tMemBucket *pBucket) { diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 3c5e4014b3..c9b8a1e08b 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1011,7 +1011,7 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle); int32_t cleanUpUdfs(); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); -bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); +int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); @@ -1196,15 +1196,18 @@ bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) { return true; } -bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) { - if (functionSetup(pCtx, pResultCellInfo) != true) { - return false; +int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) { + if (pResultCellInfo->initialized) { + return TSDB_CODE_SUCCESS; + } + if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FUNC_SETUP_ERROR; } UdfcFuncHandle handle; int32_t udfCode = 0; if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); - return false; + return TSDB_CODE_FUNC_SETUP_ERROR; } SUdfcUvSession *session = (SUdfcUvSession *)handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo); @@ -1218,7 +1221,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); releaseUdfFuncHandle(pCtx->udfName, handle); - return false; + return TSDB_CODE_FUNC_SETUP_ERROR; } if (buf.bufLen <= session->bufSize) { memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); @@ -1227,11 +1230,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); releaseUdfFuncHandle(pCtx->udfName, handle); - return false; + return TSDB_CODE_FUNC_SETUP_ERROR; } releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&buf); - return true; + return TSDB_CODE_SUCCESS; } int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { diff --git a/source/libs/parser/test/parTestMain.cpp b/source/libs/parser/test/parTestMain.cpp index 8d13d7cf0e..c8925b3df7 100644 --- a/source/libs/parser/test/parTestMain.cpp +++ b/source/libs/parser/test/parTestMain.cpp @@ -35,7 +35,8 @@ namespace ParserTest { class ParserEnv : public testing::Environment { public: virtual void SetUp() { - fmFuncMgtInit(); + // TODO(smj) : How to handle return value of fmFuncMgtInit + (void)fmFuncMgtInit(); initMetaDataEnv(); generateMetaData(); initLog(TD_TMP_DIR_PATH "td"); diff --git a/source/libs/planner/test/planTestMain.cpp b/source/libs/planner/test/planTestMain.cpp index 4e013c44b8..e05fa27c1e 100644 --- a/source/libs/planner/test/planTestMain.cpp +++ b/source/libs/planner/test/planTestMain.cpp @@ -27,7 +27,8 @@ class PlannerEnv : public testing::Environment { public: virtual void SetUp() { - fmFuncMgtInit(); + // TODO(smj) : How to handle return value of fmFuncMgtInit + (void)fmFuncMgtInit(); initMetaDataEnv(); generateMetaData(); initLog(TD_TMP_DIR_PATH "td"); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3bd1ae3ed5..09c7224c1e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -709,6 +709,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_NOT_SUPPORTED, "Func to_tim TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_CHAR_NOT_SUPPORTED, "Func to_char failed for unsupported format") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TIME_UNIT_INVALID, "Invalid function time unit") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL, "Function time unit cannot be smaller than db precision") +TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_INVALID_VALUE_RANGE, "Function got invalid value range") +TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_SETUP_ERROR, "Function set up failed") //udf TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")