Merge pull request #26735 from taosdata/enh/TD-31043

enh:[TD-31043] Handling return value in function
This commit is contained in:
dapan1121 2024-07-23 18:08:37 +08:00 committed by GitHub
commit 35827fb534
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1285 additions and 762 deletions

View File

@ -49,7 +49,7 @@ typedef struct SBlockOrderInfo {
#define colDataSetNull_f_s(c_, r_) \ #define colDataSetNull_f_s(c_, r_) \
do { \ do { \
colDataSetNull_f((c_)->nullbitmap, r_); \ colDataSetNull_f((c_)->nullbitmap, r_); \
memset(((char*)(c_)->pData) + (c_)->info.bytes * (r_), 0, (c_)->info.bytes); \ (void)memset(((char*)(c_)->pData) + (c_)->info.bytes * (r_), 0, (c_)->info.bytes); \
} while (0) } while (0)
#define colDataClearNull_f(bm_, r_) \ #define colDataClearNull_f(bm_, r_) \

View File

@ -36,7 +36,7 @@ typedef struct SFuncExecEnv {
} SFuncExecEnv; } SFuncExecEnv;
typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -260,7 +260,7 @@ bool fmIsProcessByRowFunc(int32_t funcId);
bool fmisSelectGroupConstValueFunc(int32_t funcId); bool fmisSelectGroupConstValueFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType, int32_t pkBytes); void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList); int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);
@ -273,7 +273,7 @@ typedef enum EFuncDataRequired {
} EFuncDataRequired; } EFuncDataRequired;
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo); int32_t fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo, int32_t *reqStatus);
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);

View File

@ -77,7 +77,7 @@ void freeUdfInterBuf(SUdfInterBuf *buf);
// high level APIs // high level APIs
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);

View File

@ -860,6 +860,11 @@ int32_t taosGetErrSize();
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR TAOS_DEF_ERROR_CODE(0, 0x2807) #define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR TAOS_DEF_ERROR_CODE(0, 0x2807)
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2808) #define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2808)
#define TSDB_CODE_FUNC_TO_CHAR_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2809) #define TSDB_CODE_FUNC_TO_CHAR_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2809)
#define TSDB_CODE_FUNC_TIME_UNIT_INVALID TAOS_DEF_ERROR_CODE(0, 0x280A)
#define TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL TAOS_DEF_ERROR_CODE(0, 0x280B)
#define TSDB_CODE_FUNC_INVALID_VALUE_RANGE TAOS_DEF_ERROR_CODE(0, 0x280C)
#define TSDB_CODE_FUNC_SETUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x280D)
//udf //udf
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901) #define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)

View File

@ -897,7 +897,11 @@ void taos_init_imp(void) {
tscError("failed to init task queue"); tscError("failed to init task queue");
return; return;
} }
fmFuncMgtInit(); if (fmFuncMgtInit() != TSDB_CODE_SUCCESS) {
tscInitRes = -1;
tscError("failed to init function manager");
return;
}
nodesInitAllocatorSet(); nodesInitAllocatorSet();
clientConnRefPool = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);

View File

@ -1684,14 +1684,17 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) { if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId); bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
if (!isUdaf) { if (!isUdaf) {
// TODO(xxx) : need handle return value of fmGetFuncExecFuncs.
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
} else { } else {
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName; char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
pCtx->udfName = taosStrdup(udfName); pCtx->udfName = taosStrdup(udfName);
// TODO(xxx) : need handle return value of fmGetUdafExecFuncs.
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
} }
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
} else { } else {
// TODO(xxx) : need handle return value of fmGetScalarFuncExecFuncs.
fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
if (pCtx->sfp.getEnv != NULL) { if (pCtx->sfp.getEnv != NULL) {
pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env); pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);

View File

@ -521,8 +521,8 @@ int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t n
if (!pResInfo->initialized) { if (!pResInfo->initialized) {
if (pCtx[i].functionId != -1) { if (pCtx[i].functionId != -1) {
bool ini = pCtx[i].fpSet.init(&pCtx[i], pResInfo); int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
if (!ini && fmIsUserDefinedFunc(pCtx[i].functionId)) { if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)){
pResInfo->initialized = false; pResInfo->initialized = false;
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
} }

View File

@ -44,8 +44,8 @@ static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator); static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator);
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator); static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator);
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols); static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols);
static void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, static int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup,
int32_t numOfExprs); int32_t stage, int32_t numOfExprs);
static void destroyProjectOperatorInfo(void* param) { static void destroyProjectOperatorInfo(void* param) {
if (NULL == param) { if (NULL == param) {
@ -142,7 +142,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -447,7 +450,11 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
goto _error; goto _error;
} }
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); code = setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -589,7 +596,8 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
return (rows > 0) ? pInfo->pRes : NULL; return (rows > 0) ? pInfo->pRes : NULL;
} }
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 || if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
@ -597,8 +605,12 @@ void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
continue; continue;
} }
pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo); code = pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} }
return code;
} }
/* /*
@ -610,7 +622,7 @@ void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
* offset[0] offset[1] offset[2] * offset[0] offset[1] offset[2]
*/ */
// TODO refactor: some function move away // TODO refactor: some function move away
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
int32_t numOfExprs) { int32_t numOfExprs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
@ -632,7 +644,7 @@ void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SA
pCtx[i].scanFlag = stage; pCtx[i].scanFlag = stage;
} }
initCtxOutputBuffer(pCtx, numOfExprs); return initCtxOutputBuffer(pCtx, numOfExprs);
} }
SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
@ -841,7 +853,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
// do nothing // do nothing
} else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
pfCtx->fpSet.init(pfCtx, pResInfo); code = pfCtx->fpSet.init(pfCtx, pResInfo);
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
}
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset

View File

@ -202,8 +202,9 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset); SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo); int32_t reqStatus;
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { code = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo, &reqStatus);
if (code != TSDB_CODE_SUCCESS || reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
notLoadBlock = false; notLoadBlock = false;
break; break;
} }

View File

@ -1086,12 +1086,13 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pBlock; return (rows == 0) ? NULL : pBlock;
} }
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { static int32_t doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
if (NULL == pResult) { if (NULL == pResult) {
return; return TSDB_CODE_SUCCESS;
} }
int32_t code = TSDB_CODE_SUCCESS;
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
@ -1101,15 +1102,19 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf,
} }
pResInfo->initialized = false; pResInfo->initialized = false;
if (pCtx[i].functionId != -1) { if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo); code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} }
} }
SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId); SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
if (NULL == bufPage) { if (NULL == bufPage) {
return; return TSDB_CODE_SUCCESS;
} }
setBufPageDirty(bufPage, true); setBufPageDirty(bufPage, true);
releaseBufPage(pResultBuf, bufPage); releaseBufPage(pResultBuf, bufPage);
return TSDB_CODE_SUCCESS;
} }
static void destroyStateWindowOperatorInfo(void* param) { static void destroyStateWindowOperatorInfo(void* param) {

View File

@ -49,9 +49,9 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); int32_t loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos, char** value);
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult); int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
@ -74,7 +74,7 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx);
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t minFunction(SqlFunctionCtx* pCtx); int32_t minFunction(SqlFunctionCtx* pCtx);
int32_t maxFunction(SqlFunctionCtx* pCtx); int32_t maxFunction(SqlFunctionCtx* pCtx);
@ -83,7 +83,7 @@ int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t avgFunction(SqlFunctionCtx* pCtx); int32_t avgFunction(SqlFunctionCtx* pCtx);
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx); int32_t avgFunctionMerge(SqlFunctionCtx* pCtx);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -97,7 +97,7 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getAvgInfoSize(); int32_t getAvgInfoSize();
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFunction(SqlFunctionCtx* pCtx);
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -111,18 +111,18 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getStddevInfoSize(); int32_t getStddevInfoSize();
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t leastSQRFunction(SqlFunctionCtx* pCtx); int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t percentileFunction(SqlFunctionCtx* pCtx); int32_t percentileFunction(SqlFunctionCtx* pCtx);
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t apercentileFunction(SqlFunctionCtx* pCtx); int32_t apercentileFunction(SqlFunctionCtx* pCtx);
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx);
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -131,16 +131,16 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
int32_t getApercentileMaxSize(); int32_t getApercentileMaxSize();
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t diffFunction(SqlFunctionCtx* pCtx); int32_t diffFunction(SqlFunctionCtx* pCtx);
int32_t diffFunctionByRow(SArray* pCtx); int32_t diffFunctionByRow(SArray* pCtx);
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t derivativeFunction(SqlFunctionCtx* pCtx); int32_t derivativeFunction(SqlFunctionCtx* pCtx);
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); int32_t irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx* pCtx); int32_t irateFunction(SqlFunctionCtx* pCtx);
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx); int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -165,7 +165,7 @@ EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo);
int32_t lastRowFunction(SqlFunctionCtx* pCtx); int32_t lastRowFunction(SqlFunctionCtx* pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t topFunction(SqlFunctionCtx* pCtx); int32_t topFunction(SqlFunctionCtx* pCtx);
int32_t bottomFunction(SqlFunctionCtx* pCtx); int32_t bottomFunction(SqlFunctionCtx* pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -174,7 +174,7 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getTopBotInfoSize(int64_t numOfItems); int32_t getTopBotInfoSize(int64_t numOfItems);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t spreadFunction(SqlFunctionCtx* pCtx); int32_t spreadFunction(SqlFunctionCtx* pCtx);
int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx);
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -183,7 +183,7 @@ int32_t getSpreadInfoSize();
int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t elapsedFunction(SqlFunctionCtx* pCtx); int32_t elapsedFunction(SqlFunctionCtx* pCtx);
int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx); int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx);
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
@ -192,7 +192,7 @@ int32_t getElapsedInfoSize();
int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFunction(SqlFunctionCtx* pCtx);
int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx); int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx);
int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx); int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx);
@ -210,7 +210,7 @@ int32_t getHLLInfoSize();
int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stateFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stateCountFunction(SqlFunctionCtx* pCtx); int32_t stateCountFunction(SqlFunctionCtx* pCtx);
int32_t stateDurationFunction(SqlFunctionCtx* pCtx); int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
@ -218,35 +218,35 @@ bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t csumFunction(SqlFunctionCtx* pCtx); int32_t csumFunction(SqlFunctionCtx* pCtx);
bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t mavgFunction(SqlFunctionCtx* pCtx); int32_t mavgFunction(SqlFunctionCtx* pCtx);
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t sampleFunction(SqlFunctionCtx* pCtx); int32_t sampleFunction(SqlFunctionCtx* pCtx);
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t tailFunction(SqlFunctionCtx* pCtx); int32_t tailFunction(SqlFunctionCtx* pCtx);
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t uniqueFunction(SqlFunctionCtx* pCtx); int32_t uniqueFunction(SqlFunctionCtx* pCtx);
bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t modeFunction(SqlFunctionCtx* pCtx); int32_t modeFunction(SqlFunctionCtx* pCtx);
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t twaFunction(SqlFunctionCtx* pCtx); int32_t twaFunction(SqlFunctionCtx* pCtx);
int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
int32_t blockDistFunction(SqlFunctionCtx* pCtx); int32_t blockDistFunction(SqlFunctionCtx* pCtx);
int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);

View File

@ -37,7 +37,7 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32
pResInfo->complete = false; pResInfo->complete = false;
pResInfo->numOfRes = 0; pResInfo->numOfRes = 0;
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); (void)memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
} }
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -55,14 +55,15 @@ typedef struct SHistogramInfo {
#endif #endif
} SHistogramInfo; } SHistogramInfo;
SHistogramInfo* tHistogramCreate(int32_t numOfBins); int32_t tHistogramCreate(int32_t numOfEntries, SHistogramInfo** pHisto);
SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins); SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins);
int32_t tHistogramAdd(SHistogramInfo** pHisto, double val); int32_t tHistogramAdd(SHistogramInfo** pHisto, double val);
int64_t tHistogramSum(SHistogramInfo* pHisto, double v); int64_t tHistogramSum(SHistogramInfo* pHisto, double v);
double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num); int32_t tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num, double** pVal);
SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2, int32_t numOfEntries); int32_t tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2, int32_t numOfEntries,
SHistogramInfo** pResHistogram);
void tHistogramDestroy(SHistogramInfo** pHisto); void tHistogramDestroy(SHistogramInfo** pHisto);
void tHistogramPrint(SHistogramInfo* pHisto); void tHistogramPrint(SHistogramInfo* pHisto);

View File

@ -67,7 +67,8 @@ typedef struct tMemBucket {
SHashObj *groupPagesMap; // disk page map for different groups; SHashObj *groupPagesMap; // disk page map for different groups;
} tMemBucket; } tMemBucket;
tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup); int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup,
tMemBucket **pBucket);
void tMemBucketDestroy(tMemBucket *pBucket); void tMemBucketDestroy(tMemBucket *pBucket);

View File

@ -25,7 +25,7 @@
static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) { static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList; va_list vArgList;
va_start(vArgList, pFormat); va_start(vArgList, pFormat);
vsnprintf(pErrBuf, len, pFormat, vArgList); (void)vsnprintf(pErrBuf, len, pFormat, vArgList);
va_end(vArgList); va_end(vArgList);
return errCode; return errCode;
} }
@ -42,27 +42,24 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
} }
#define TIME_UNIT_INVALID 1
#define TIME_UNIT_TOO_SMALL 2
static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) { static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
if (!IS_DURATION_VAL(pVal->flag)) { if (!IS_DURATION_VAL(pVal->flag)) {
return TIME_UNIT_INVALID; return TSDB_CODE_FUNC_TIME_UNIT_INVALID;
} }
if (TSDB_TIME_PRECISION_MILLI == dbPrec && if (TSDB_TIME_PRECISION_MILLI == dbPrec &&
(0 == strcasecmp(pVal->literal, "1u") || 0 == strcasecmp(pVal->literal, "1b"))) { (0 == strcasecmp(pVal->literal, "1u") || 0 == strcasecmp(pVal->literal, "1b"))) {
return TIME_UNIT_TOO_SMALL; return TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL;
} }
if (TSDB_TIME_PRECISION_MICRO == dbPrec && 0 == strcasecmp(pVal->literal, "1b")) { if (TSDB_TIME_PRECISION_MICRO == dbPrec && 0 == strcasecmp(pVal->literal, "1b")) {
return TIME_UNIT_TOO_SMALL; return TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL;
} }
if (pVal->literal[0] != '1' || if (pVal->literal[0] != '1' ||
(pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' && (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w' && pVal->literal[1] != 'b')) { pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w' && pVal->literal[1] != 'b')) {
return TIME_UNIT_INVALID; return TSDB_CODE_FUNC_TIME_UNIT_INVALID;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -138,13 +135,13 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
} }
if (i == 2) { if (i == 2) {
memcpy(buf, &tz[i - 1], 2); (void)memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10); hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) { if (!validateHourRange(hour)) {
return false; return false;
} }
} else if (i == 4) { } else if (i == 4) {
memcpy(buf, &tz[i - 1], 2); (void)memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10); minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) { if (!validateMinuteRange(hour, minute, tz[0])) {
return false; return false;
@ -167,13 +164,13 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
} }
if (i == 2) { if (i == 2) {
memcpy(buf, &tz[i - 1], 2); (void)memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10); hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) { if (!validateHourRange(hour)) {
return false; return false;
} }
} else if (i == 5) { } else if (i == 5) {
memcpy(buf, &tz[i - 1], 2); (void)memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10); minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) { if (!validateMinuteRange(hour, minute, tz[0])) {
return false; return false;
@ -215,7 +212,7 @@ static int32_t addTimezoneParam(SNodeList* pList) {
time_t t = taosTime(NULL); time_t t = taosTime(NULL);
struct tm tmInfo; struct tm tmInfo;
if (taosLocalTime(&t, &tmInfo, buf) != NULL) { if (taosLocalTime(&t, &tmInfo, buf) != NULL) {
strftime(buf, sizeof(buf), "%z", &tmInfo); (void)strftime(buf, sizeof(buf), "%z", &tmInfo);
} }
int32_t len = (int32_t)strlen(buf); int32_t len = (int32_t)strlen(buf);
@ -225,15 +222,27 @@ static int32_t addTimezoneParam(SNodeList* pList) {
} }
pVal->literal = strndup(buf, len); pVal->literal = strndup(buf, len);
if (pVal->literal == NULL) {
nodesDestroyNode((SNode*)pVal);
return TSDB_CODE_OUT_OF_MEMORY;
}
pVal->translate = true; pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_BINARY; pVal->node.resType.type = TSDB_DATA_TYPE_BINARY;
pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE; pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE;
pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI; pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI;
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1); pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1);
if (pVal->datum.p == NULL) {
nodesDestroyNode((SNode*)pVal);
return TSDB_CODE_OUT_OF_MEMORY;
}
varDataSetLen(pVal->datum.p, len); varDataSetLen(pVal->datum.p, len);
strncpy(varDataVal(pVal->datum.p), pVal->literal, len); (void)strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
nodesListAppend(pList, (SNode*)pVal); int32_t code = nodesListAppend(pList, (SNode*)pVal);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pVal);
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -252,7 +261,11 @@ static int32_t addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
pVal->datum.i = (int64_t)precision; pVal->datum.i = (int64_t)precision;
pVal->typeData = (int64_t)precision; pVal->typeData = (int64_t)precision;
nodesListMakeAppend(pList, (SNode*)pVal); int32_t code = nodesListMakeAppend(pList, (SNode*)pVal);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pVal);
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -936,13 +949,13 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1)); int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1));
if (ret == TIME_UNIT_TOO_SMALL) { if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, code,
"ELAPSED function time unit parameter should be greater than db precision"); "ELAPSED function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (code == TSDB_CODE_FUNC_TIME_UNIT_INVALID) {
return buildFuncErrMsg( return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, pErrBuf, len, code,
"ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
} }
@ -1062,7 +1075,7 @@ static int8_t validateHistogramBinType(char* binTypeStr) {
return binType; return binType;
} }
static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* errMsg, int32_t msgLen) { static int32_t validateHistogramBinDesc(char* binDescStr, int8_t binType, char* errMsg, int32_t msgLen) {
const char* msg1 = "HISTOGRAM function requires four parameters"; const char* msg1 = "HISTOGRAM function requires four parameters";
const char* msg3 = "HISTOGRAM function invalid format for binDesc parameter"; const char* msg3 = "HISTOGRAM function invalid format for binDesc parameter";
const char* msg4 = "HISTOGRAM function binDesc parameter \"count\" should be in range [1, 1000]"; const char* msg4 = "HISTOGRAM function binDesc parameter \"count\" should be in range [1, 1000]";
@ -1070,6 +1083,7 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
const char* msg6 = "HISTOGRAM function binDesc parameter \"width\" cannot be 0"; const char* msg6 = "HISTOGRAM function binDesc parameter \"width\" cannot be 0";
const char* msg7 = "HISTOGRAM function binDesc parameter \"start\" cannot be 0 with \"log_bin\" type"; const char* msg7 = "HISTOGRAM function binDesc parameter \"start\" cannot be 0 with \"log_bin\" type";
const char* msg8 = "HISTOGRAM function binDesc parameter \"factor\" cannot be negative or equal to 0/1"; const char* msg8 = "HISTOGRAM function binDesc parameter \"factor\" cannot be negative or equal to 0/1";
const char* msg9 = "HISTOGRAM function out of memory";
cJSON* binDesc = cJSON_Parse(binDescStr); cJSON* binDesc = cJSON_Parse(binDescStr);
int32_t numOfBins; int32_t numOfBins;
@ -1078,9 +1092,9 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
int32_t numOfParams = cJSON_GetArraySize(binDesc); int32_t numOfParams = cJSON_GetArraySize(binDesc);
int32_t startIndex; int32_t startIndex;
if (numOfParams != 4) { if (numOfParams != 4) {
snprintf(errMsg, msgLen, "%s", msg1); (void)snprintf(errMsg, msgLen, "%s", msg1);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
cJSON* start = cJSON_GetObjectItem(binDesc, "start"); cJSON* start = cJSON_GetObjectItem(binDesc, "start");
@ -1090,22 +1104,22 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity"); cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) { if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000 if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
snprintf(errMsg, msgLen, "%s", msg4); (void)snprintf(errMsg, msgLen, "%s", msg4);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) || if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) { (factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
snprintf(errMsg, msgLen, "%s", msg5); (void)snprintf(errMsg, msgLen, "%s", msg5);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
int32_t counter = (int32_t)count->valueint; int32_t counter = (int32_t)count->valueint;
@ -1118,53 +1132,58 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
} }
intervals = taosMemoryCalloc(numOfBins, sizeof(double)); intervals = taosMemoryCalloc(numOfBins, sizeof(double));
if (intervals == NULL) {
(void)snprintf(errMsg, msgLen, "%s", msg9);
cJSON_Delete(binDesc);
return TSDB_CODE_FAILED;
}
if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) { if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) {
// linear bin process // linear bin process
if (width->valuedouble == 0) { if (width->valuedouble == 0) {
snprintf(errMsg, msgLen, "%s", msg6); (void)snprintf(errMsg, msgLen, "%s", msg6);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
for (int i = 0; i < counter + 1; ++i) { for (int i = 0; i < counter + 1; ++i) {
intervals[startIndex] = start->valuedouble + i * width->valuedouble; intervals[startIndex] = start->valuedouble + i * width->valuedouble;
if (isinf(intervals[startIndex])) { if (isinf(intervals[startIndex])) {
snprintf(errMsg, msgLen, "%s", msg5); (void)snprintf(errMsg, msgLen, "%s", msg5);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
startIndex++; startIndex++;
} }
} else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) { } else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) {
// log bin process // log bin process
if (start->valuedouble == 0) { if (start->valuedouble == 0) {
snprintf(errMsg, msgLen, "%s", msg7); (void)snprintf(errMsg, msgLen, "%s", msg7);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) { if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
snprintf(errMsg, msgLen, "%s", msg8); (void)snprintf(errMsg, msgLen, "%s", msg8);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
for (int i = 0; i < counter + 1; ++i) { for (int i = 0; i < counter + 1; ++i) {
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0); intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
if (isinf(intervals[startIndex])) { if (isinf(intervals[startIndex])) {
snprintf(errMsg, msgLen, "%s", msg5); (void)snprintf(errMsg, msgLen, "%s", msg5);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
startIndex++; startIndex++;
} }
} else { } else {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
if (infinity->valueint == true) { if (infinity->valueint == true) {
@ -1172,7 +1191,7 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
intervals[numOfBins - 1] = INFINITY; intervals[numOfBins - 1] = INFINITY;
// in case of desc bin orders, -inf/inf should be swapped // in case of desc bin orders, -inf/inf should be swapped
if (numOfBins < 4) { if (numOfBins < 4) {
return false; return TSDB_CODE_FAILED;
} }
if (intervals[1] > intervals[numOfBins - 2]) { if (intervals[1] > intervals[numOfBins - 2]) {
@ -1181,46 +1200,51 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err
} }
} else if (cJSON_IsArray(binDesc)) { /* user input bins */ } else if (cJSON_IsArray(binDesc)) { /* user input bins */
if (binType != USER_INPUT_BIN) { if (binType != USER_INPUT_BIN) {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
numOfBins = cJSON_GetArraySize(binDesc); numOfBins = cJSON_GetArraySize(binDesc);
intervals = taosMemoryCalloc(numOfBins, sizeof(double)); intervals = taosMemoryCalloc(numOfBins, sizeof(double));
if (intervals == NULL) {
(void)snprintf(errMsg, msgLen, "%s", msg9);
cJSON_Delete(binDesc);
return TSDB_CODE_OUT_OF_MEMORY;
}
cJSON* bin = binDesc->child; cJSON* bin = binDesc->child;
if (bin == NULL) { if (bin == NULL) {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
int i = 0; int i = 0;
while (bin) { while (bin) {
intervals[i] = bin->valuedouble; intervals[i] = bin->valuedouble;
if (!cJSON_IsNumber(bin)) { if (!cJSON_IsNumber(bin)) {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
if (i != 0 && intervals[i] <= intervals[i - 1]) { if (i != 0 && intervals[i] <= intervals[i - 1]) {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
taosMemoryFree(intervals); taosMemoryFree(intervals);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
bin = bin->next; bin = bin->next;
i++; i++;
} }
} else { } else {
snprintf(errMsg, msgLen, "%s", msg3); (void)snprintf(errMsg, msgLen, "%s", msg3);
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
return false; return TSDB_CODE_FAILED;
} }
cJSON_Delete(binDesc); cJSON_Delete(binDesc);
taosMemoryFree(intervals); taosMemoryFree(intervals);
return true; return TSDB_CODE_SUCCESS;
} }
static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
@ -1265,7 +1289,7 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
if (i == 2) { if (i == 2) {
char errMsg[128] = {0}; char errMsg[128] = {0};
binDesc = varDataVal(pValue->datum.p); binDesc = varDataVal(pValue->datum.p);
if (!validateHistogramBinDesc(binDesc, binType, errMsg, (int32_t)sizeof(errMsg))) { if (TSDB_CODE_SUCCESS != validateHistogramBinDesc(binDesc, binType, errMsg, (int32_t)sizeof(errMsg))) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, errMsg); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, errMsg);
} }
} }
@ -1323,7 +1347,7 @@ static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32
if (i == 2) { if (i == 2) {
char errMsg[128] = {0}; char errMsg[128] = {0};
binDesc = varDataVal(pValue->datum.p); binDesc = varDataVal(pValue->datum.p);
if (!validateHistogramBinDesc(binDesc, binType, errMsg, (int32_t)sizeof(errMsg))) { if (TSDB_CODE_SUCCESS != validateHistogramBinDesc(binDesc, binType, errMsg, (int32_t)sizeof(errMsg))) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, errMsg); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, errMsg);
} }
} }
@ -1507,12 +1531,12 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
if (numOfParams == 4) { if (numOfParams == 4) {
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3)); int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3));
if (ret == TIME_UNIT_TOO_SMALL) { if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, code,
"STATEDURATION function time unit parameter should be greater than db precision"); "STATEDURATION function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (code == TSDB_CODE_FUNC_TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, code,
"STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, " "STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, "
"1s, 1m, 1h, 1d, 1w]"); "1s, 1m, 1h, 1d, 1w]");
} }
@ -2268,13 +2292,13 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
} }
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1)); int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1));
if (ret == TIME_UNIT_TOO_SMALL) { if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, code,
"TIMETRUNCATE function time unit parameter should be greater than db precision"); "TIMETRUNCATE function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (code == TSDB_CODE_FUNC_TIME_UNIT_INVALID) {
return buildFuncErrMsg( return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, pErrBuf, len, code,
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
@ -2291,7 +2315,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
// add database precision as param // add database precision as param
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2330,13 +2354,13 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
if (3 == numOfParams) { if (3 == numOfParams) {
int32_t ret = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2)); int32_t code = validateTimeUnitParam(dbPrec, (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2));
if (ret == TIME_UNIT_TOO_SMALL) { if (code == TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, code,
"TIMEDIFF function time unit parameter should be greater than db precision"); "TIMEDIFF function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (code == TSDB_CODE_FUNC_TIME_UNIT_INVALID) {
return buildFuncErrMsg( return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, pErrBuf, len, code,
"TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -354,14 +354,17 @@ bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true; return true;
} }
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { int32_t avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) { if (pResultInfo->initialized) {
return false; return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) {
return TSDB_CODE_FUNC_SETUP_ERROR;
} }
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
memset(pRes, 0, sizeof(SAvgRes)); (void)memset(pRes, 0, sizeof(SAvgRes));
return true; return TSDB_CODE_SUCCESS;
} }
static int32_t calculateAvgBySMAInfo(SAvgRes* pRes, int32_t numOfRows, int32_t type, const SColumnDataAgg* pAgg) { static int32_t calculateAvgBySMAInfo(SAvgRes* pRes, int32_t numOfRows, int32_t type, const SColumnDataAgg* pAgg) {
@ -849,15 +852,23 @@ int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getAvgInfoSize(); int32_t resultBytes = getAvgInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
int32_t code = TSDB_CODE_SUCCESS;
memcpy(varDataVal(res), pInfo, resultBytes); if (NULL == res) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(void)memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes); varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
if(NULL == pCol) {
code = TSDB_CODE_OUT_OF_RANGE;
goto _exit;
}
colDataSetVal(pCol, pBlock->info.rows, res, false); code = colDataSetVal(pCol, pBlock->info.rows, res, false);
_exit:
taosMemoryFree(res); taosMemoryFree(res);
return pResInfo->numOfRes; return code;
} }

View File

@ -20,6 +20,7 @@
#include "tglobal.h" #include "tglobal.h"
#define __COMPARE_ACQUIRED_MAX(i, end, bm, _data, ctx, val, pos) \ #define __COMPARE_ACQUIRED_MAX(i, end, bm, _data, ctx, val, pos) \
int32_t code = TSDB_CODE_SUCCESS; \
for (; i < (end); ++i) { \ for (; i < (end); ++i) { \
if (colDataIsNull_f(bm, i)) { \ if (colDataIsNull_f(bm, i)) { \
continue; \ continue; \
@ -28,12 +29,16 @@
if ((val) < (_data)[i]) { \ if ((val) < (_data)[i]) { \
(val) = (_data)[i]; \ (val) = (_data)[i]; \
if ((ctx)->subsidiaries.num > 0) { \ if ((ctx)->subsidiaries.num > 0) { \
updateTupleData((ctx), i, (ctx)->pSrcBlock, pos); \ code = updateTupleData((ctx), i, (ctx)->pSrcBlock, pos); \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} \ } \
} \ } \
} }
#define __COMPARE_ACQUIRED_MIN(i, end, bm, _data, ctx, val, pos) \ #define __COMPARE_ACQUIRED_MIN(i, end, bm, _data, ctx, val, pos) \
int32_t code = TSDB_CODE_SUCCESS; \
for (; i < (end); ++i) { \ for (; i < (end); ++i) { \
if (colDataIsNull_f(bm, i)) { \ if (colDataIsNull_f(bm, i)) { \
continue; \ continue; \
@ -42,7 +47,10 @@
if ((val) > (_data)[i]) { \ if ((val) > (_data)[i]) { \
(val) = (_data)[i]; \ (val) = (_data)[i]; \
if ((ctx)->subsidiaries.num > 0) { \ if ((ctx)->subsidiaries.num > 0) { \
updateTupleData((ctx), i, (ctx)->pSrcBlock, pos); \ code = updateTupleData((ctx), i, (ctx)->pSrcBlock, pos); \
if (TSDB_CODE_SUCCESS != code) { \
return code; \
} \
} \ } \
} \ } \
} }
@ -571,7 +579,7 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
return -1; return -1;
} }
static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunctionCtx* pCtx, SMinmaxResInfo* pBuf, static int32_t doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunctionCtx* pCtx, SMinmaxResInfo* pBuf,
bool isMinFunc) { bool isMinFunc) {
if (isMinFunc) { if (isMinFunc) {
switch (pCol->info.type) { switch (pCol->info.type) {
@ -700,6 +708,7 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct
} }
} }
} }
return TSDB_CODE_SUCCESS;
} }
static int32_t saveRelatedTupleTag(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, void* tval) { static int32_t saveRelatedTupleTag(SqlFunctionCtx* pCtx, SInputColumnInfoData* pInput, void* tval) {
@ -840,7 +849,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
break; break;
} }
default: default:
memcpy(&pBuf->v, p, pCol->info.bytes); (void)memcpy(&pBuf->v, p, pCol->info.bytes);
break; break;
} }
@ -858,7 +867,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems)
goto _over; goto _over;
} }
doExtractVal(pCol, i, end, pCtx, pBuf, isMinFunc); code = doExtractVal(pCol, i, end, pCtx, pBuf, isMinFunc);
} else { } else {
numOfElems = numOfRows; numOfElems = numOfRows;

View File

@ -35,14 +35,14 @@ static void doInitFunctionTable() {
gFunMgtService.pFuncNameHashTable = gFunMgtService.pFuncNameHashTable =
taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (NULL == gFunMgtService.pFuncNameHashTable) { if (NULL == gFunMgtService.pFuncNameHashTable) {
initFunctionCode = TSDB_CODE_FAILED; initFunctionCode = terrno;
return; return;
} }
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) { for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name, if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name,
strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) { strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) {
initFunctionCode = TSDB_CODE_FAILED; initFunctionCode = terrno;
return; return;
} }
} }
@ -61,7 +61,7 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
} }
int32_t fmFuncMgtInit() { int32_t fmFuncMgtInit() {
taosThreadOnce(&functionHashTableInit, doInitFunctionTable); (void)taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
return initFunctionCode; return initFunctionCode;
} }
@ -115,20 +115,24 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow); return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
} }
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo) { int32_t fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo, int32_t *reqStatus) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
*reqStatus = -1;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
const char* name = funcMgtBuiltins[funcId].name; const char* name = funcMgtBuiltins[funcId].name;
if ((strcmp(name, "_group_key") == 0) || (strcmp(name, "_select_value") == 0)) { if ((strcmp(name, "_group_key") == 0) || (strcmp(name, "_select_value") == 0)) {
return FUNC_DATA_REQUIRED_NOT_LOAD; *reqStatus = FUNC_DATA_REQUIRED_NOT_LOAD;
return TSDB_CODE_SUCCESS;;
} }
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) { if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
return FUNC_DATA_REQUIRED_DATA_LOAD; *reqStatus = FUNC_DATA_REQUIRED_DATA_LOAD;
return TSDB_CODE_SUCCESS;
} else { } else {
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pBlockInfo); *reqStatus = funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pBlockInfo);
return TSDB_CODE_SUCCESS;
} }
} }
@ -378,29 +382,30 @@ static int32_t getFuncInfo(SFunctionNode* pFunc) {
return fmGetFuncInfo(pFunc, msg, sizeof(msg)); return fmGetFuncInfo(pFunc, msg, sizeof(msg));
} }
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); *pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) { if (NULL == *pFunc) {
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pName); (void)snprintf((*pFunc)->functionName, sizeof((*pFunc)->functionName), "%s", pName);
pFunc->pParameterList = pParameterList; (*pFunc)->pParameterList = pParameterList;
if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) { int32_t code = getFuncInfo((*pFunc));
pFunc->pParameterList = NULL; if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc); (*pFunc)->pParameterList = NULL;
return NULL; nodesDestroyNode((SNode*)*pFunc);
return code;
} }
return pFunc; return code;
} }
static SNode* createColumnByFunc(const SFunctionNode* pFunc) { static int32_t createColumnByFunc(const SFunctionNode* pFunc, SColumnNode** pCol) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); *pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == *pCol) {
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
strcpy(pCol->colName, pFunc->node.aliasName); (void)strcpy((*pCol)->colName, pFunc->node.aliasName);
pCol->node.resType = pFunc->node.resType; (*pCol)->node.resType = pFunc->node.resType;
return (SNode*)pCol; return TSDB_CODE_SUCCESS;
} }
bool fmIsDistExecFunc(int32_t funcId) { bool fmIsDistExecFunc(int32_t funcId) {
@ -418,17 +423,17 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
if (NULL == pParameterList) { if (NULL == pParameterList) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
*pPartialFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList); int32_t code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList,pPartialFunc );
if (NULL == *pPartialFunc) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
(*pPartialFunc)->hasOriginalFunc = true; (*pPartialFunc)->hasOriginalFunc = true;
(*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId; (*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0}; char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc); int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
taosCreateMD5Hash(name, len); (void)taosCreateMD5Hash(name, len);
strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1); (void)strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
(*pPartialFunc)->hasPk = pSrcFunc->hasPk; (*pPartialFunc)->hasPk = pSrcFunc->hasPk;
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes; (*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -436,7 +441,11 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc, static int32_t createMergeFuncPara(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
SNodeList** pParameterList) { SNodeList** pParameterList) {
SNode* pRes = createColumnByFunc(pPartialFunc); SNode *pRes = NULL;
int32_t code = createColumnByFunc(pPartialFunc, (SColumnNode**)&pRes);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (NULL != funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc) { if (NULL != funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc) {
return funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc(pSrcFunc->pParameterList, pRes, pParameterList); return funcMgtBuiltins[pSrcFunc->funcId].createMergeParaFuc(pSrcFunc->pParameterList, pRes, pParameterList);
} else { } else {
@ -452,16 +461,13 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList); int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){ if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList); code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList, &pFunc);
}else{ }else{
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList, &pFunc);
}
if (NULL == pFunc) {
code = TSDB_CODE_OUT_OF_MEMORY;
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
strcpy(pFunc->node.aliasName, pPartialFunc->node.aliasName); (void)strcpy(pFunc->node.aliasName, pPartialFunc->node.aliasName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -481,10 +487,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList); int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList); code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList, &pFunc);
if (NULL == pFunc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pFunc->hasOriginalFunc = true; pFunc->hasOriginalFunc = true;
@ -493,7 +496,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
if (fmIsSameInOutType(pSrcFunc->funcId)) { if (fmIsSameInOutType(pSrcFunc->funcId)) {
pFunc->node.resType = pSrcFunc->node.resType; pFunc->node.resType = pSrcFunc->node.resType;
} }
strcpy(pFunc->node.aliasName, pSrcFunc->node.aliasName); (void)strcpy(pFunc->node.aliasName, pSrcFunc->node.aliasName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -541,13 +544,13 @@ static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pSt
if (funcMgtBuiltins[pFunc->funcId].pStateFunc) { if (funcMgtBuiltins[pFunc->funcId].pStateFunc) {
SNodeList* pParams = nodesCloneList(pFunc->pParameterList); SNodeList* pParams = nodesCloneList(pFunc->pParameterList);
if (!pParams) return TSDB_CODE_OUT_OF_MEMORY; if (!pParams) return TSDB_CODE_OUT_OF_MEMORY;
*pStateFunc = createFunction(funcMgtBuiltins[pFunc->funcId].pStateFunc, pParams); int32_t code = createFunction(funcMgtBuiltins[pFunc->funcId].pStateFunc, pParams, pStateFunc);
if (!*pStateFunc) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pParams); nodesDestroyList(pParams);
return TSDB_CODE_FUNC_FUNTION_ERROR; return TSDB_CODE_FUNC_FUNTION_ERROR;
} }
strcpy((*pStateFunc)->node.aliasName, pFunc->node.aliasName); (void)strcpy((*pStateFunc)->node.aliasName, pFunc->node.aliasName);
strcpy((*pStateFunc)->node.userAlias, pFunc->node.userAlias); (void)strcpy((*pStateFunc)->node.userAlias, pFunc->node.userAlias);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -587,13 +590,13 @@ static int32_t fmCreateStateMergeFunc(SFunctionNode* pFunc, SFunctionNode** pSta
if (funcMgtBuiltins[pFunc->funcId].pMergeFunc) { if (funcMgtBuiltins[pFunc->funcId].pMergeFunc) {
SNodeList* pParams = nodesCloneList(pFunc->pParameterList); SNodeList* pParams = nodesCloneList(pFunc->pParameterList);
if (!pParams) return TSDB_CODE_OUT_OF_MEMORY; if (!pParams) return TSDB_CODE_OUT_OF_MEMORY;
*pStateMergeFunc = createFunction(funcMgtBuiltins[pFunc->funcId].pMergeFunc, pParams); int32_t code = createFunction(funcMgtBuiltins[pFunc->funcId].pMergeFunc, pParams, pStateMergeFunc);
if (!*pStateMergeFunc) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pParams); nodesDestroyList(pParams);
return TSDB_CODE_FUNC_FUNTION_ERROR; return code;
} }
strcpy((*pStateMergeFunc)->node.aliasName, pFunc->node.aliasName); (void)strcpy((*pStateMergeFunc)->node.aliasName, pFunc->node.aliasName);
strcpy((*pStateMergeFunc)->node.userAlias, pFunc->node.userAlias); (void)strcpy((*pStateMergeFunc)->node.userAlias, pFunc->node.userAlias);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -646,6 +649,9 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
} }
if (strcmp(pFunc->pStateFunc, pStateFunc->name) == 0) return true; if (strcmp(pFunc->pStateFunc, pStateFunc->name) == 0) return true;
int32_t stateMergeFuncId = fmGetFuncId(pFunc->pStateFunc); int32_t stateMergeFuncId = fmGetFuncId(pFunc->pStateFunc);
if (stateMergeFuncId == -1) {
return false;
}
const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId]; const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId];
return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0; return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0;
} }

View File

@ -32,9 +32,12 @@
*/ */
static int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val); static int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val);
SHistogramInfo* tHistogramCreate(int32_t numOfEntries) { int32_t tHistogramCreate(int32_t numOfEntries, SHistogramInfo** pHisto) {
/* need one redundant slot */ /* need one redundant slot */
SHistogramInfo* pHisto = taosMemoryMalloc(sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfEntries + 1)); *pHisto = taosMemoryMalloc(sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfEntries + 1));
if (NULL == *pHisto) {
return TSDB_CODE_OUT_OF_MEMORY;
}
#if !defined(USE_ARRAYLIST) #if !defined(USE_ARRAYLIST)
pHisto->pList = SSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); pHisto->pList = SSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
@ -46,11 +49,12 @@ SHistogramInfo* tHistogramCreate(int32_t numOfEntries) {
pss->pTree = pHisto->pLoserTree; pss->pTree = pHisto->pLoserTree;
#endif #endif
return tHistogramCreateFrom(pHisto, numOfEntries); *pHisto = tHistogramCreateFrom(*pHisto, numOfEntries);
return TSDB_CODE_SUCCESS;
} }
SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins) { SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins) {
memset(pBuf, 0, sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfBins + 1)); (void)memset(pBuf, 0, sizeof(SHistogramInfo) + sizeof(SHistBin) * (numOfBins + 1));
SHistogramInfo* pHisto = (SHistogramInfo*)pBuf; SHistogramInfo* pHisto = (SHistogramInfo*)pBuf;
pHisto->elems = (SHistBin*)((char*)pBuf + sizeof(SHistogramInfo)); pHisto->elems = (SHistBin*)((char*)pBuf + sizeof(SHistogramInfo));
@ -67,15 +71,19 @@ SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins) {
} }
int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
int32_t code = TSDB_CODE_SUCCESS;
if (*pHisto == NULL) { if (*pHisto == NULL) {
*pHisto = tHistogramCreate(MAX_HISTOGRAM_BIN); code = tHistogramCreate(MAX_HISTOGRAM_BIN, pHisto);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} }
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val); int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
if (ASSERTS(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL, "tHistogramAdd Error, idx:%d, maxEntries:%d, elems:%p", if (ASSERTS(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL, "tHistogramAdd Error, idx:%d, maxEntries:%d, elems:%p",
idx, (*pHisto)->maxEntries, (*pHisto)->elems)) { idx, (*pHisto)->maxEntries, (*pHisto)->elems)) {
return -1; return TSDB_CODE_FAILED;
} }
if ((*pHisto)->elems[idx].val == val && idx >= 0) { if ((*pHisto)->elems[idx].val == val && idx >= 0) {
@ -89,23 +97,23 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
if (idx > 0) { if (idx > 0) {
if (ASSERTS((*pHisto)->elems[idx - 1].val <= val, "tHistogramAdd Error, elems[%d].val:%lf, val:%lf", if (ASSERTS((*pHisto)->elems[idx - 1].val <= val, "tHistogramAdd Error, elems[%d].val:%lf, val:%lf",
idx - 1, (*pHisto)->elems[idx - 1].val, val)) { idx - 1, (*pHisto)->elems[idx - 1].val, val)) {
return -1; return TSDB_CODE_FAILED;
} }
} else { } else {
if (ASSERTS((*pHisto)->elems[idx].val > val, "tHistogramAdd Error, elems[%d].val:%lf, val:%lf", if (ASSERTS((*pHisto)->elems[idx].val > val, "tHistogramAdd Error, elems[%d].val:%lf, val:%lf",
idx, (*pHisto)->elems[idx].val, val)) { idx, (*pHisto)->elems[idx].val, val)) {
return -1; return TSDB_CODE_FAILED;
} }
} }
} else if ((*pHisto)->numOfElems > 0) { } else if ((*pHisto)->numOfElems > 0) {
if (ASSERTS((*pHisto)->elems[(*pHisto)->numOfEntries].val <= val, "tHistogramAdd Error, elems[%d].val:%lf, val:%lf", if (ASSERTS((*pHisto)->elems[(*pHisto)->numOfEntries].val <= val, "tHistogramAdd Error, elems[%d].val:%lf, val:%lf",
(*pHisto)->numOfEntries, (*pHisto)->elems[idx].val, val)) { (*pHisto)->numOfEntries, (*pHisto)->elems[idx].val, val)) {
return -1; return TSDB_CODE_FAILED;
} }
} }
int32_t code = histogramCreateBin(*pHisto, idx, val); code = histogramCreateBin(*pHisto, idx, val);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
@ -286,7 +294,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
} }
(*pHisto)->numOfElems += 1; (*pHisto)->numOfElems += 1;
return 0; return code;
} }
int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) { int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) {
@ -335,7 +343,7 @@ static void histogramMergeImpl(SHistBin* pHistBin, int32_t* size) {
s1->val = newVal; s1->val = newVal;
s1->num = s1->num + s2->num; s1->num = s1->num + s2->num;
memmove(&pHistBin[index + 1], &pHistBin[index + 2], (oldSize - index - 2) * sizeof(SHistBin)); (void)memmove(&pHistBin[index + 1], &pHistBin[index + 2], (oldSize - index - 2) * sizeof(SHistBin));
(*size) -= 1; (*size) -= 1;
#endif #endif
} }
@ -345,12 +353,12 @@ int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val) {
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
int32_t remain = pHisto->numOfEntries - index; int32_t remain = pHisto->numOfEntries - index;
if (remain > 0) { if (remain > 0) {
memmove(&pHisto->elems[index + 1], &pHisto->elems[index], sizeof(SHistBin) * remain); (void)memmove(&pHisto->elems[index + 1], &pHisto->elems[index], sizeof(SHistBin) * remain);
} }
if (ASSERTS(index >= 0 && index <= pHisto->maxEntries, "histogramCreateBin Error, index:%d, maxEntries:%d", if (ASSERTS(index >= 0 && index <= pHisto->maxEntries, "histogramCreateBin Error, index:%d, maxEntries:%d",
index, pHisto->maxEntries)) { index, pHisto->maxEntries)) {
return -1; return TSDB_CODE_FAILED;
} }
pHisto->elems[index].num = 1; pHisto->elems[index].num = 1;
@ -367,10 +375,10 @@ int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val) {
#endif #endif
if (ASSERTS(pHisto->numOfEntries <= pHisto->maxEntries, "histogramCreateBin Error, numOfEntries:%d, maxEntries:%d", if (ASSERTS(pHisto->numOfEntries <= pHisto->maxEntries, "histogramCreateBin Error, numOfEntries:%d, maxEntries:%d",
pHisto->numOfEntries, pHisto->maxEntries)) { pHisto->numOfEntries, pHisto->maxEntries)) {
return -1; return TSDB_CODE_FAILED;
} }
return 0; return TSDB_CODE_SUCCESS;
} }
void tHistogramDestroy(SHistogramInfo** pHisto) { void tHistogramDestroy(SHistogramInfo** pHisto) {
@ -383,17 +391,17 @@ void tHistogramDestroy(SHistogramInfo** pHisto) {
} }
void tHistogramPrint(SHistogramInfo* pHisto) { void tHistogramPrint(SHistogramInfo* pHisto) {
printf("total entries: %d, elements: %" PRId64 "\n", pHisto->numOfEntries, pHisto->numOfElems); (void)printf("total entries: %d, elements: %" PRId64 "\n", pHisto->numOfEntries, pHisto->numOfElems);
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
for (int32_t i = 0; i < pHisto->numOfEntries; ++i) { for (int32_t i = 0; i < pHisto->numOfEntries; ++i) {
printf("%d: (%f, %" PRId64 ")\n", i + 1, pHisto->elems[i].val, pHisto->elems[i].num); (void)printf("%d: (%f, %" PRId64 ")\n", i + 1, pHisto->elems[i].val, pHisto->elems[i].num);
} }
#else #else
tSkipListNode* pNode = pHisto->pList->pHead.pForward[0]; tSkipListNode* pNode = pHisto->pList->pHead.pForward[0];
for (int32_t i = 0; i < pHisto->numOfEntries; ++i) { for (int32_t i = 0; i < pHisto->numOfEntries; ++i) {
SHistBin* pEntry = (SHistBin*)pNode->pData; SHistBin* pEntry = (SHistBin*)pNode->pData;
printf("%d: (%f, %" PRId64 ")\n", i + 1, pEntry->val, pEntry->num); (void)printf("%d: (%f, %" PRId64 ")\n", i + 1, pEntry->val, pEntry->num);
pNode = pNode->pForward[0]; pNode = pNode->pForward[0];
} }
#endif #endif
@ -443,21 +451,24 @@ int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
#endif #endif
} }
double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num) { int32_t tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num, double** pVal) {
#if defined(USE_ARRAYLIST) #if defined(USE_ARRAYLIST)
double* pVal = taosMemoryMalloc(num * sizeof(double)); *pVal = taosMemoryMalloc(num * sizeof(double));
if (NULL == *pVal) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
double numOfElem = (ratio[i] / 100) * pHisto->numOfElems; double numOfElem = (ratio[i] / 100) * pHisto->numOfElems;
if (numOfElem == 0) { if (numOfElem == 0) {
pVal[i] = pHisto->min; (*pVal)[i] = pHisto->min;
continue; continue;
} else if (numOfElem <= pHisto->elems[0].num) { } else if (numOfElem <= pHisto->elems[0].num) {
pVal[i] = pHisto->elems[0].val; (*pVal)[i] = pHisto->elems[0].val;
continue; continue;
} else if (numOfElem == pHisto->numOfElems) { } else if (numOfElem == pHisto->numOfElems) {
pVal[i] = pHisto->max; (*pVal)[i] = pHisto->max;
continue; continue;
} }
@ -479,37 +490,39 @@ double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num) {
double delta = numOfElem - total; double delta = numOfElem - total;
if (fabs(delta) < FLT_EPSILON) { if (fabs(delta) < FLT_EPSILON) {
pVal[i] = pHisto->elems[j].val; (*pVal)[i] = pHisto->elems[j].val;
} }
double start = (double)pHisto->elems[j].num; double start = (double)pHisto->elems[j].num;
double range = pHisto->elems[j + 1].num - start; double range = pHisto->elems[j + 1].num - start;
if (range == 0) { if (range == 0) {
pVal[i] = (pHisto->elems[j + 1].val - pHisto->elems[j].val) * delta / start + pHisto->elems[j].val; (*pVal)[i] = (pHisto->elems[j + 1].val - pHisto->elems[j].val) * delta / start + pHisto->elems[j].val;
} else { } else {
double factor = (-2 * start + sqrt(4 * start * start - 4 * range * (-2 * delta))) / (2 * range); double factor = (-2 * start + sqrt(4 * start * start - 4 * range * (-2 * delta))) / (2 * range);
pVal[i] = pHisto->elems[j].val + (pHisto->elems[j + 1].val - pHisto->elems[j].val) * factor; (*pVal)[i] = pHisto->elems[j].val + (pHisto->elems[j + 1].val - pHisto->elems[j].val) * factor;
} }
} }
#else #else
double* pVal = taosMemoryMalloc(num * sizeof(double)); double* pVal = taosMemoryMalloc(num * sizeof(double));
if (NULL == *pVal) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
double numOfElem = ratio[i] * pHisto->numOfElems; double numOfElem = ratio[i] * pHisto->numOfElems;
tSkipListNode* pFirst = pHisto->pList->pHead.pForward[0]; tSkipListNode* pFirst = pHisto->pList->pHead.pForward[0];
SHistBin* pEntry = (SHistBin*)pFirst->pData; SHistBin* pEntry = (SHistBin*)pFirst->pData;
if (numOfElem == 0) { if (numOfElem == 0) {
pVal[i] = pHisto->min; (*pVal)[i] = pHisto->min;
printf("i/numofSlot: %f, v:%f, %f\n", ratio[i], numOfElem, pVal[i]); printf("i/numofSlot: %f, v:%f, %f\n", ratio[i], numOfElem, pVal[i]);
continue; continue;
} else if (numOfElem <= pEntry->num) { } else if (numOfElem <= pEntry->num) {
pVal[i] = pEntry->val; (*pVal)[i] = pEntry->val;
printf("i/numofSlot: %f, v:%f, %f\n", ratio[i], numOfElem, pVal[i]); printf("i/numofSlot: %f, v:%f, %f\n", ratio[i], numOfElem, pVal[i]);
continue; continue;
} else if (numOfElem == pHisto->numOfElems) { } else if (numOfElem == pHisto->numOfElems) {
pVal[i] = pHisto->max; (*pVal)[i] = pHisto->max;
printf("i/numofSlot: %f, v:%f, %f\n", ratio[i], numOfElem, pVal[i]); printf("i/numofSlot: %f, v:%f, %f\n", ratio[i], numOfElem, pVal[i]);
continue; continue;
} }
@ -540,34 +553,40 @@ double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num) {
if (fabs(delta) < FLT_EPSILON) { if (fabs(delta) < FLT_EPSILON) {
// printf("i/numofSlot: %f, v:%f, %f\n", // printf("i/numofSlot: %f, v:%f, %f\n",
// (double)i/numOfSlots, numOfElem, pHisto->elems[j].val); // (double)i/numOfSlots, numOfElem, pHisto->elems[j].val);
pVal[i] = pPrev->val; (*pVal)[i] = pPrev->val;
} }
double start = pPrev->num; double start = pPrev->num;
double range = pEntry->num - start; double range = pEntry->num - start;
if (range == 0) { if (range == 0) {
pVal[i] = (pEntry->val - pPrev->val) * delta / start + pPrev->val; (*pVal)[i] = (pEntry->val - pPrev->val) * delta / start + pPrev->val;
} else { } else {
double factor = (-2 * start + sqrt(4 * start * start - 4 * range * (-2 * delta))) / (2 * range); double factor = (-2 * start + sqrt(4 * start * start - 4 * range * (-2 * delta))) / (2 * range);
pVal[i] = pPrev->val + (pEntry->val - pPrev->val) * factor; (*pVal)[i] = pPrev->val + (pEntry->val - pPrev->val) * factor;
} }
// printf("i/numofSlot: %f, v:%f, %f\n", (double)i/numOfSlots, // printf("i/numofSlot: %f, v:%f, %f\n", (double)i/numOfSlots,
// numOfElem, val); // numOfElem, val);
} }
#endif #endif
return pVal; return TSDB_CODE_SUCCESS;
} }
SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2, int32_t numOfEntries) { int32_t tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2, int32_t numOfEntries, SHistogramInfo** pResHistogram) {
SHistogramInfo* pResHistogram = tHistogramCreate(numOfEntries); int32_t code = tHistogramCreate(numOfEntries, pResHistogram);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
// error in histogram info // error in histogram info
if (pHisto1->numOfEntries > MAX_HISTOGRAM_BIN || pHisto2->numOfEntries > MAX_HISTOGRAM_BIN) { if (pHisto1->numOfEntries > MAX_HISTOGRAM_BIN || pHisto2->numOfEntries > MAX_HISTOGRAM_BIN) {
return pResHistogram; return code;
} }
SHistBin* pHistoBins = taosMemoryCalloc(1, sizeof(SHistBin) * (pHisto1->numOfEntries + pHisto2->numOfEntries)); SHistBin* pHistoBins = taosMemoryCalloc(1, sizeof(SHistBin) * (pHisto1->numOfEntries + pHisto2->numOfEntries));
if (NULL == pHistoBins) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t i = 0, j = 0, k = 0; int32_t i = 0, j = 0, k = 0;
while (i < pHisto1->numOfEntries && j < pHisto2->numOfEntries) { while (i < pHisto1->numOfEntries && j < pHisto2->numOfEntries) {
@ -583,28 +602,28 @@ SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2
if (i < pHisto1->numOfEntries) { if (i < pHisto1->numOfEntries) {
int32_t remain = pHisto1->numOfEntries - i; int32_t remain = pHisto1->numOfEntries - i;
memcpy(&pHistoBins[k], &pHisto1->elems[i], sizeof(SHistBin) * remain); (void)memcpy(&pHistoBins[k], &pHisto1->elems[i], sizeof(SHistBin) * remain);
k += remain; k += remain;
} }
if (j < pHisto2->numOfEntries) { if (j < pHisto2->numOfEntries) {
int32_t remain = pHisto2->numOfEntries - j; int32_t remain = pHisto2->numOfEntries - j;
memcpy(&pHistoBins[k], &pHisto2->elems[j], sizeof(SHistBin) * remain); (void)memcpy(&pHistoBins[k], &pHisto2->elems[j], sizeof(SHistBin) * remain);
k += remain; k += remain;
} }
/* update other information */ /* update other information */
pResHistogram->numOfElems = pHisto1->numOfElems + pHisto2->numOfElems; (*pResHistogram)->numOfElems = pHisto1->numOfElems + pHisto2->numOfElems;
pResHistogram->min = (pHisto1->min < pHisto2->min) ? pHisto1->min : pHisto2->min; (*pResHistogram)->min = (pHisto1->min < pHisto2->min) ? pHisto1->min : pHisto2->min;
pResHistogram->max = (pHisto1->max > pHisto2->max) ? pHisto1->max : pHisto2->max; (*pResHistogram)->max = (pHisto1->max > pHisto2->max) ? pHisto1->max : pHisto2->max;
while (k > numOfEntries) { while (k > numOfEntries) {
histogramMergeImpl(pHistoBins, &k); histogramMergeImpl(pHistoBins, &k);
} }
pResHistogram->numOfEntries = k; (*pResHistogram)->numOfEntries = k;
memcpy(pResHistogram->elems, pHistoBins, sizeof(SHistBin) * k); (void)memcpy((*pResHistogram)->elems, pHistoBins, sizeof(SHistBin) * k);
taosMemoryFree(pHistoBins); taosMemoryFree(pHistoBins);
return pResHistogram; return TSDB_CODE_SUCCESS;
} }

View File

@ -28,9 +28,12 @@
int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) { return (times * numOfSlots) + slotIndex; } int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) { return (times * numOfSlots) + slotIndex; }
static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) { static int32_t loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx, SFilePage ** buffer) {
SFilePage *buffer = *buffer =
(SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage)); (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage));
if (NULL == *buffer) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times);
@ -39,26 +42,30 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if (p != NULL) { if (p != NULL) {
pIdList = *(SArray **)p; pIdList = *(SArray **)p;
} else { } else {
taosMemoryFree(buffer); taosMemoryFree(*buffer);
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < taosArrayGetSize(pIdList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pIdList); ++i) {
int32_t *pageId = taosArrayGet(pIdList, i); int32_t *pageId = taosArrayGet(pIdList, i);
if (pageId == NULL) {
taosMemoryFree(*buffer);
return TSDB_CODE_OUT_OF_RANGE;
}
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) { if (pg == NULL) {
taosMemoryFree(buffer); taosMemoryFree(*buffer);
return NULL; return terrno;
} }
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); (void)memcpy((*buffer)->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes); offset += (int32_t)(pg->num * pMemBucket->bytes);
} }
taosSort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn); taosSort((*buffer)->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn);
return buffer; return TSDB_CODE_SUCCESS;
} }
static void resetBoundingBox(MinMaxEntry *range, int32_t type) { static void resetBoundingBox(MinMaxEntry *range, int32_t type) {
@ -116,6 +123,9 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
ASSERT(list->size == 1); ASSERT(list->size == 1);
int32_t *pageId = taosArrayGet(list, 0); int32_t *pageId = taosArrayGet(list, 0);
if (NULL == pageId) {
return TSDB_CODE_OUT_OF_RANGE;
}
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) { if (pPage == NULL) {
return terrno; return terrno;
@ -238,67 +248,71 @@ static void resetSlotInfo(tMemBucket *pBucket) {
} }
} }
tMemBucket *tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup) { int32_t tMemBucketCreate(int32_t nElemSize, int16_t dataType, double minval, double maxval, bool hasWindowOrGroup,
tMemBucket *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket)); tMemBucket **pBucket) {
if (pBucket == NULL) { *pBucket = (tMemBucket *)taosMemoryCalloc(1, sizeof(tMemBucket));
return NULL; if (*pBucket == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} }
if (hasWindowOrGroup) { if (hasWindowOrGroup) {
// With window or group by, we need to shrink page size and reduce page num to save memory. // With window or group by, we need to shrink page size and reduce page num to save memory.
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket (*pBucket)->numOfSlots = DEFAULT_NUM_OF_SLOT / 8 ; // 128 bucket
pBucket->bufPageSize = 4096; // 4k per page (*pBucket)->bufPageSize = 4096; // 4k per page
} else { } else {
pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; (*pBucket)->numOfSlots = DEFAULT_NUM_OF_SLOT;
pBucket->bufPageSize = 16384 * 4; // 16k per page (*pBucket)->bufPageSize = 16384 * 4; // 16k per page
} }
pBucket->type = dataType; (*pBucket)->type = dataType;
pBucket->bytes = nElemSize; (*pBucket)->bytes = nElemSize;
pBucket->total = 0; (*pBucket)->total = 0;
pBucket->times = 1; (*pBucket)->times = 1;
pBucket->maxCapacity = 200000; (*pBucket)->maxCapacity = 200000;
pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); (*pBucket)->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { if ((*pBucket)->groupPagesMap == NULL) {
tMemBucketDestroy(*pBucket);
return terrno;
}
if (setBoundingBox(&(*pBucket)->range, (*pBucket)->type, minval, maxval) != 0) {
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
taosMemoryFree(pBucket); tMemBucketDestroy(*pBucket);
return NULL; return TSDB_CODE_FUNC_INVALID_VALUE_RANGE;
} }
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(SFilePage)) / pBucket->bytes; (*pBucket)->elemPerPage = ((*pBucket)->bufPageSize - sizeof(SFilePage)) / (*pBucket)->bytes;
pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC); (*pBucket)->comparFn = getKeyComparFunc((*pBucket)->type, TSDB_ORDER_ASC);
pBucket->hashFunc = getHashFunc(pBucket->type); (*pBucket)->hashFunc = getHashFunc((*pBucket)->type);
if (pBucket->hashFunc == NULL) { if ((*pBucket)->hashFunc == NULL) {
// qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); // qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type);
taosMemoryFree(pBucket); tMemBucketDestroy(*pBucket);
return NULL; return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
} }
pBucket->pSlots = (tMemBucketSlot *)taosMemoryCalloc(pBucket->numOfSlots, sizeof(tMemBucketSlot)); (*pBucket)->pSlots = (tMemBucketSlot *)taosMemoryCalloc((*pBucket)->numOfSlots, sizeof(tMemBucketSlot));
if (pBucket->pSlots == NULL) { if ((*pBucket)->pSlots == NULL) {
taosMemoryFree(pBucket); tMemBucketDestroy(*pBucket);
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
resetSlotInfo(pBucket); resetSlotInfo((*pBucket));
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE;
// qError("MemBucket create disk based Buf failed since %s", terrstr(terrno)); // qError("MemBucket create disk based Buf failed since %s", terrstr(terrno));
tMemBucketDestroy(pBucket); tMemBucketDestroy(*pBucket);
return NULL; return TSDB_CODE_NO_DISKSPACE;
} }
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir); int32_t ret = createDiskbasedBuf(&(*pBucket)->pBuffer, (*pBucket)->bufPageSize, (*pBucket)->bufPageSize * 1024, "1", tsTempDir);
if (ret != 0) { if (ret != 0) {
tMemBucketDestroy(pBucket); tMemBucketDestroy(*pBucket);
return NULL; return ret;
} }
// qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); // qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes);
return pBucket; return TSDB_CODE_SUCCESS;
} }
void tMemBucketDestroy(tMemBucket *pBucket) { void tMemBucketDestroy(tMemBucket *pBucket) {
@ -394,7 +408,14 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
void *p = taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId)); void *p = taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
if (p == NULL) { if (p == NULL) {
pPageIdList = taosArrayInit(4, sizeof(int32_t)); pPageIdList = taosArrayInit(4, sizeof(int32_t));
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pPageIdList, POINTER_BYTES); if (NULL == pPageIdList) {
return terrno;
}
int32_t code = taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pPageIdList, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
taosArrayDestroy(pPageIdList);
return code;
}
} else { } else {
pPageIdList = *(SArray **)p; pPageIdList = *(SArray **)p;
} }
@ -404,10 +425,13 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
return terrno; return terrno;
} }
pSlot->info.pageId = pageId; pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId); if (taosArrayPush(pPageIdList, &pageId) == NULL) {
taosArrayDestroy(pPageIdList);
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes); (void)memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes);
pSlot->info.data->num += 1; pSlot->info.data->num += 1;
pSlot->info.size += 1; pSlot->info.size += 1;
@ -493,9 +517,10 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
if (pSlot->info.size <= pMemBucket->maxCapacity) { if (pSlot->info.size <= pMemBucket->maxCapacity) {
// data in buffer and file are merged together to be processed. // data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); SFilePage *buffer = NULL;
if (buffer == NULL) { int32_t code = loadDataFromFilePage(pMemBucket, i, &buffer);
return terrno; if (TSDB_CODE_SUCCESS != code) {
return code;
} }
int32_t currentIdx = count - num; int32_t currentIdx = count - num;
@ -541,6 +566,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
for (int32_t f = 0; f < list->size; ++f) { for (int32_t f = 0; f < list->size; ++f) {
int32_t *pageId = taosArrayGet(list, f); int32_t *pageId = taosArrayGet(list, f);
if (NULL == pageId) {
return TSDB_CODE_OUT_OF_RANGE;
}
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) { if (pg == NULL) {
return terrno; return terrno;

View File

@ -1011,7 +1011,7 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
int32_t cleanUpUdfs(); int32_t cleanUpUdfs();
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
@ -1196,15 +1196,18 @@ bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
return true; return true;
} }
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) { int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
if (functionSetup(pCtx, pResultCellInfo) != true) { if (pResultCellInfo->initialized) {
return false; return TSDB_CODE_SUCCESS;
}
if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FUNC_SETUP_ERROR;
} }
UdfcFuncHandle handle; UdfcFuncHandle handle;
int32_t udfCode = 0; int32_t udfCode = 0;
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false; return TSDB_CODE_FUNC_SETUP_ERROR;
} }
SUdfcUvSession *session = (SUdfcUvSession *)handle; SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo); SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
@ -1218,7 +1221,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName, handle); releaseUdfFuncHandle(pCtx->udfName, handle);
return false; return TSDB_CODE_FUNC_SETUP_ERROR;
} }
if (buf.bufLen <= session->bufSize) { if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
@ -1227,11 +1230,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
} else { } else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName, handle); releaseUdfFuncHandle(pCtx->udfName, handle);
return false; return TSDB_CODE_FUNC_SETUP_ERROR;
} }
releaseUdfFuncHandle(pCtx->udfName, handle); releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&buf); freeUdfInterBuf(&buf);
return true; return TSDB_CODE_SUCCESS;
} }
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {

View File

@ -10420,8 +10420,9 @@ static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char*
return code; return code;
} }
SNode* pFunc = (SNode*)createFunction("last", pParameterList); SNode* pFunc = NULL;
if (NULL == pFunc) { code = createFunction("last", pParameterList, (SFunctionNode**)&pFunc);
if (code) {
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -10438,8 +10439,9 @@ static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char*
return code; return code;
} }
SFunctionNode* pFunc1 = createFunction("_vgid", NULL); SFunctionNode* pFunc1 = NULL;
if (NULL == pFunc1) { code = createFunction("_vgid", NULL, &pFunc1);
if (code) {
nodesDestroyList(pProjectionList); nodesDestroyList(pProjectionList);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -10451,8 +10453,9 @@ static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char*
return code; return code;
} }
SFunctionNode* pFunc2 = createFunction("_vgver", NULL); SFunctionNode* pFunc2 = NULL;
if (NULL == pFunc2) { code = createFunction("_vgver", NULL, &pFunc2);
if (code) {
nodesDestroyList(pProjectionList); nodesDestroyList(pProjectionList);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }

View File

@ -35,7 +35,8 @@ namespace ParserTest {
class ParserEnv : public testing::Environment { class ParserEnv : public testing::Environment {
public: public:
virtual void SetUp() { virtual void SetUp() {
fmFuncMgtInit(); // TODO(smj) : How to handle return value of fmFuncMgtInit
(void)fmFuncMgtInit();
initMetaDataEnv(); initMetaDataEnv();
generateMetaData(); generateMetaData();
initLog(TD_TMP_DIR_PATH "td"); initLog(TD_TMP_DIR_PATH "td");

View File

@ -5336,9 +5336,13 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
} }
int32_t stbJoinOptAddFuncToScanNode(char* funcName, SScanLogicNode* pScan) { int32_t stbJoinOptAddFuncToScanNode(char* funcName, SScanLogicNode* pScan) {
SFunctionNode* pUidFunc = createFunction(funcName, NULL); SFunctionNode* pUidFunc = NULL;
int32_t code = createFunction(funcName, NULL, &pUidFunc);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", pUidFunc->functionName, pUidFunc); snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", pUidFunc->functionName, pUidFunc);
int32_t code = nodesListStrictAppend(pScan->pScanPseudoCols, (SNode*)pUidFunc); code = nodesListStrictAppend(pScan->pScanPseudoCols, (SNode*)pUidFunc);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExpr((SNode*)pUidFunc, &pScan->node.pTargets); code = createColumnByRewriteExpr((SNode*)pUidFunc, &pScan->node.pTargets);
} }

View File

@ -27,7 +27,8 @@
class PlannerEnv : public testing::Environment { class PlannerEnv : public testing::Environment {
public: public:
virtual void SetUp() { virtual void SetUp() {
fmFuncMgtInit(); // TODO(smj) : How to handle return value of fmFuncMgtInit
(void)fmFuncMgtInit();
initMetaDataEnv(); initMetaDataEnv();
generateMetaData(); generateMetaData();
initLog(TD_TMP_DIR_PATH "td"); initLog(TD_TMP_DIR_PATH "td");

View File

@ -707,6 +707,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR, "Func to_timest
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR, "Func to_timestamp failed for wrong timestamp") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR, "Func to_timestamp failed for wrong timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_NOT_SUPPORTED, "Func to_timestamp failed for unsupported timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_NOT_SUPPORTED, "Func to_timestamp failed for unsupported timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_CHAR_NOT_SUPPORTED, "Func to_char failed for unsupported format") TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_CHAR_NOT_SUPPORTED, "Func to_char failed for unsupported format")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TIME_UNIT_INVALID, "Invalid function time unit")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TIME_UNIT_TOO_SMALL, "Function time unit cannot be smaller than db precision")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_INVALID_VALUE_RANGE, "Function got invalid value range")
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_SETUP_ERROR, "Function set up failed")
//udf //udf
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")

View File

@ -93,9 +93,8 @@ class TDTestCase:
tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT " tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT "
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
time.sleep(2) sql= "select * from sta"
tdSql.query("select * from sta") tdSql.check_rows_loop(3, sql, loopCount=100, waitTime=0.5)
tdSql.checkRows(3)
tdSql.query("select tbname from sta order by tbname") tdSql.query("select tbname from sta order by tbname")
if not tdSql.getData(0, 0).startswith('nee_w-t1_sta_'): if not tdSql.getData(0, 0).startswith('nee_w-t1_sta_'):
tdLog.exit("error1") tdLog.exit("error1")
@ -106,8 +105,8 @@ class TDTestCase:
if not tdSql.getData(2, 0).startswith('nee_w-t3_sta_'): if not tdSql.getData(2, 0).startswith('nee_w-t3_sta_'):
tdLog.exit("error3") tdLog.exit("error3")
tdSql.query("select * from stb") sql= "select * from stb"
tdSql.checkRows(3) tdSql.check_rows_loop(3, sql, loopCount=100, waitTime=0.5)
tdSql.query("select tbname from stb order by tbname") tdSql.query("select tbname from stb order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_stb_'): if not tdSql.getData(0, 0).startswith('new-t1_stb_'):
tdLog.exit("error4") tdLog.exit("error4")