From 89e324c89926cdc8e842f20cafb2a3cb0de30fd2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Nov 2021 15:08:00 +0800 Subject: [PATCH] [td-10564]fix compiler error. --- include/libs/function/function.h | 48 ++++-- include/util/tdef.h | 4 - source/libs/executor/src/executorimpl.c | 153 ++---------------- source/libs/function/src/taggfunction.c | 202 ++++++++++++------------ source/libs/function/src/tfunction.c | 24 +++ source/libs/function/src/tudf.c | 81 +++++++++- 6 files changed, 245 insertions(+), 267 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index b91cc83255..3f5422824a 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -26,8 +26,8 @@ extern "C" { #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results -#define FUNCTION_SCALAR 1 -#define FUNCTION_AGG 2 +#define FUNCTION_TYPE_SCALAR 1 +#define FUNCTION_TYPE_AGG 2 #define TOP_BOTTOM_QUERY_LIMIT 100 #define FUNCTIONS_NAME_MAX_LENGTH 16 @@ -108,8 +108,23 @@ typedef struct SExtTagsInfo { struct SQLFunctionCtx **pTagCtxList; } SExtTagsInfo; +typedef struct SResultDataInfo { + int16_t type; + int16_t bytes; + int32_t intermediateBytes; +} SResultDataInfo; + #define GET_RES_INFO(ctx) ((ctx)->resultInfo) +typedef struct SFunctionFpSet { + bool (*init)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment + void (*addInput)(struct SQLFunctionCtx *pCtx); + + // finalizer must be called after all exec has been executed to generated final result. + void (*finalize)(struct SQLFunctionCtx *pCtx); + void (*combine)(struct SQLFunctionCtx *pCtx); +} SFunctionFpSet; + // sql function runtime context typedef struct SQLFunctionCtx { int32_t size; // number of rows @@ -118,9 +133,7 @@ typedef struct SQLFunctionCtx { int16_t inputType; int16_t inputBytes; - int16_t outputType; - int16_t outputBytes; // size of results, determined by function and input column data type - int32_t interBufBytes; // internal buffer size + SResultDataInfo resDataInfo; bool hasNull; // null value exist in current block bool requireNull; // require null in some function bool stableQuery; @@ -135,11 +148,13 @@ typedef struct SQLFunctionCtx { SVariant tag; bool isAggSet; - SColumnDataAgg agg; + SColumnDataAgg agg; struct SResultRowEntryInfo *resultInfo; SExtTagsInfo tagInfo; SPoint1 start; SPoint1 end; + + SFunctionFpSet* fpSet; } SQLFunctionCtx; enum { @@ -179,11 +194,11 @@ typedef struct SAggFunctionInfo { uint16_t status; bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment - void (*exec)(SQLFunctionCtx *pCtx); + void (*addInput)(SQLFunctionCtx *pCtx); // finalizer must be called after all exec has been executed to generated final result. - void (*xFinalize)(SQLFunctionCtx *pCtx); - void (*mergeFunc)(SQLFunctionCtx *pCtx); + void (*finalize)(SQLFunctionCtx *pCtx); + void (*combine)(SQLFunctionCtx *pCtx); int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId); } SAggFunctionInfo; @@ -194,15 +209,9 @@ typedef struct SScalarFunctionInfo { uint8_t functionId; // index of scalar function bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment - void (*exec)(SQLFunctionCtx *pCtx); + void (*addInput)(SQLFunctionCtx *pCtx); } SScalarFunctionInfo; -typedef struct SResultDataInfo { - int16_t type; - int16_t bytes; - int32_t intermediateBytes; -} SResultDataInfo; - typedef struct SMultiFunctionsDesc { bool stableQuery; bool groupbyColumn; @@ -280,6 +289,13 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo); int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// udf api +struct SUdfInfo; + +void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); +void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); + #ifdef __cplusplus } #endif diff --git a/include/util/tdef.h b/include/util/tdef.h index fca9a1395b..c7a421a55f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -318,10 +318,6 @@ do { \ #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode -#define TSDB_UDF_TYPE_SCALAR 1 -#define TSDB_UDF_TYPE_AGGREGATE 2 - - /* * 1. ordinary sub query for select * from super_table * 2. all sqlobj generated by createSubqueryObj with this flag diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 70b3c8982f..9b3cbcf9cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -24,7 +24,6 @@ #include "function.h" #include "tcompare.h" #include "tcompression.h" -#include "tlosertree.h" #include "ttypes.h" #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) @@ -356,39 +355,6 @@ void* destroyOutputBuf(SSDataBlock* pBlock) { return NULL; } -//int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput) { -// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// bool hasMainFunction = hasMainOutput(pQueryAttr); -// -// int32_t maxOutput = 0; -// for (int32_t j = 0; j < numOfOutput; ++j) { -// int32_t id = pCtx[j].functionId; -// -// /* -// * ts, tag, tagprj function can not decide the output number of current query -// * the number of output result is decided by main output -// */ -// if (hasMainFunction && (id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ)) { -// continue; -// } -// -// SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]); -// if (pResInfo != NULL && maxOutput < pResInfo->numOfRes) { -// maxOutput = pResInfo->numOfRes; -// } -// } -// -// assert(maxOutput >= 0); -// return maxOutput; -//} -// -//static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) { -// for (int32_t j = 0; j < numOfOutput; ++j) { -// SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]); -// pResInfo->numOfRes = 0; -// } -//} - static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) { return true; // bool hasTags = false; @@ -848,8 +814,6 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, pResultRowInfo->curPos = i + 1; // current not closed result object } } - - //pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey; } static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) { @@ -903,80 +867,6 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc return num; } -void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) { -#if 0 - int32_t output = 0; - - if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) { - //qError("empty udf function, type:%d", type); - return; - } - -// //qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]); - - switch (type) { - case TSDB_UDF_FUNC_NORMAL: - if (pUdfInfo->isScript) { - (*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx, - (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput, - (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes); - } else { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - - void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); - - (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, - pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init); - } - - if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { - pCtx->resultInfo->numOfRes = output; - } else { - pCtx->resultInfo->numOfRes += output; - } - - if (pCtx->resultInfo->numOfRes > 0) { - pCtx->resultInfo->hasResult = DATA_SET_FLAG; - } - - break; - - case TSDB_UDF_FUNC_MERGE: - if (pUdfInfo->isScript) { - (*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output); - } else { - (*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); - } - - // set the output value exist - pCtx->resultInfo->numOfRes = output; - if (output > 0) { - pCtx->resultInfo->hasResult = DATA_SET_FLAG; - } - - break; - - case TSDB_UDF_FUNC_FINALIZE: { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); - if (pUdfInfo->isScript) { - (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output); - } else { - (*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init); - } - // set the output value exist - pCtx->resultInfo->numOfRes = output; - if (output > 0) { - pCtx->resultInfo->hasResult = DATA_SET_FLAG; - } - - break; - } - } -#endif - -} - static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) { SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -1004,14 +894,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx pCtx[k].isAggSet = false; } - int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { -// if (functionId < 0) { // load the script and exec, pRuntimeEnv->pUdfInfo -// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; -// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL); -// } else { -// aAggs[functionId].xFunction(&pCtx[k]); -// } + pCtx[k].fpSet->addInput(&pCtx[k]); } // restore it @@ -1020,9 +904,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } } - -static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext, SDataBlockInfo *pDataBlockInfo, - TSKEY *primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) { +static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, + TSKEY* primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) { getNextTimeWindow(pQueryAttr, pNext); // next time window is not in current block @@ -1244,14 +1127,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { pCtx[k].startTs = startTs;// this can be set during create the struct - - int32_t functionId = pCtx[k].functionId; -// if (functionId < 0) { -// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; -// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL); -// } else { -// aAggs[functionId].xFunction(&pCtx[k]); -// } + pCtx[k].fpSet->addInput(&pCtx[k]); } } } @@ -1940,7 +1816,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { int32_t functionId = pCtx[i].functionId; if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { - tagLen += pCtx[i].outputBytes; + tagLen += pCtx[i].resDataInfo.bytes; pTagCtx[num++] = &pCtx[i]; } else if (1/*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/) { p = &pCtx[i]; @@ -1996,13 +1872,13 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->ptsOutputBuf = NULL; - pCtx->outputBytes = pSqlExpr->resSchema.bytes; - pCtx->outputType = pSqlExpr->resSchema.type; + pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes; + pCtx->resDataInfo.type = pSqlExpr->resSchema.type; pCtx->order = pQueryAttr->order.order; // pCtx->functionId = pSqlExpr->functionId; pCtx->stableQuery = pQueryAttr->stableQuery; - pCtx->interBufBytes = pSqlExpr->interBytes; + pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; @@ -3577,11 +3453,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { continue; } -// if (pCtx[j].functionId < 0) { // todo udf initialization -// continue; -// } else { -// aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); -// } + pCtx[j].fpSet->init(&pCtx[j], pCtx[j].resultInfo); } } @@ -3737,12 +3609,12 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { - offset += pCtx[i].outputBytes; + offset += pCtx[i].resDataInfo.bytes; continue; } pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, bufPage, pResult->offset, offset); - offset += pCtx[i].outputBytes; + offset += pCtx[i].resDataInfo.bytes; int32_t functionId = pCtx[i].functionId; if (functionId < 0) { @@ -3807,7 +3679,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF int16_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset); - offset += pCtx[i].outputBytes; + offset += pCtx[i].resDataInfo.bytes; int32_t functionId = pCtx[i].functionId; if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) { @@ -6881,7 +6753,6 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { } SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; STagScanInfo *pInfo = pOperator->info; diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index f4e48f2faa..a2e6c3d3ac 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -40,7 +40,7 @@ #define GET_TRUE_DATA_TYPE() \ int32_t type = 0; \ if (pCtx->currentStage == MERGE_STAGE) { \ - type = pCtx->outputType; \ + type = pCtx->resDataInfo.type; \ assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \ } else { \ type = pCtx->inputType; \ @@ -61,10 +61,10 @@ for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ + __ctx->tag.i = (ts); \ __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ } \ - aggFunc[FUNCTION_TAG].exec(__ctx); \ + aggFunc[FUNCTION_TAG].addInput(__ctx); \ } \ } while (0) @@ -72,7 +72,7 @@ do { \ for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - aggFunc[FUNCTION_TAG].exec(__ctx); \ + aggFunc[FUNCTION_TAG].addInput(__ctx); \ } \ } while (0); @@ -478,8 +478,8 @@ static bool function_setup(SQLFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf return false; } - memset(pCtx->pOutput, 0, (size_t)pCtx->outputBytes); - initResultRowEntry(pResultInfo, pCtx->interBufBytes); + memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes); + initResultRowEntry(pResultInfo, pCtx->resDataInfo.intermediateBytes); return true; } @@ -493,7 +493,7 @@ static bool function_setup(SQLFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf static void function_finalizer(SQLFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } doFinalizer(pCtx); @@ -914,7 +914,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } @@ -924,7 +924,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) { SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); if (pAvgInfo->num == 0) { // all data are NULL or empty table - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } @@ -1000,7 +1000,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; } - aggFunc[FUNCTION_TAG].exec(__ctx); + aggFunc[FUNCTION_TAG].addInput(__ctx); } } } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { @@ -1248,7 +1248,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[j]; - aggFunc[FUNCTION_TAG].exec(__ctx); + aggFunc[FUNCTION_TAG].addInput(__ctx); } notNullElems++; @@ -1304,7 +1304,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp } static void min_func_merge(SQLFunctionCtx *pCtx) { - int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 1); + int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->resDataInfo.bytes, pCtx->pOutput, 1); SET_VAL(pCtx, notNullElems, 1); @@ -1315,7 +1315,7 @@ static void min_func_merge(SQLFunctionCtx *pCtx) { } static void max_func_merge(SQLFunctionCtx *pCtx) { - int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 0); + int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->resDataInfo.bytes, pCtx->pOutput, 0); SET_VAL(pCtx, numOfElem, 1); @@ -1423,7 +1423,7 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) { SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pStd->num <= 0) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else { double *retValue = (double *)pCtx->pOutput; SET_DOUBLE_VAL(retValue, sqrt(pStd->res / pStd->num)); @@ -1557,7 +1557,7 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) { SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pStd->num <= 0) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else { double *retValue = (double *)pCtx->pOutput; SET_DOUBLE_VAL(retValue, sqrt(pStd->res / pStd->num)); @@ -1665,16 +1665,16 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { assert(pCtx->stableQuery); char * pData = GET_INPUT_DATA_LIST(pCtx); - SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); + SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->resDataInfo.bytes); if (pInput->hasResult != DATA_SET_FLAG) { return; } // The param[1] is used to keep the initial value of max ts value - if (pCtx->param[1].nType != pCtx->outputType || pCtx->param[1].i > pInput->ts) { - memcpy(pCtx->pOutput, pData, pCtx->outputBytes); + if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i > pInput->ts) { + memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); pCtx->param[1].i = pInput->ts; - pCtx->param[1].nType = pCtx->outputType; + pCtx->param[1].nType = pCtx->resDataInfo.type; DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); } @@ -1799,7 +1799,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { static void last_dist_func_merge(SQLFunctionCtx *pCtx) { char *pData = GET_INPUT_DATA_LIST(pCtx); - SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes); + SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->resDataInfo.bytes); if (pInput->hasResult != DATA_SET_FLAG) { return; } @@ -1808,10 +1808,10 @@ static void last_dist_func_merge(SQLFunctionCtx *pCtx) { * param[1] used to keep the corresponding timestamp to decide if current result is * the true last result */ - if (pCtx->param[1].nType != pCtx->outputType || pCtx->param[1].i < pInput->ts) { - memcpy(pCtx->pOutput, pData, pCtx->outputBytes); + if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i < pInput->ts) { + memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes); pCtx->param[1].i = pInput->ts; - pCtx->param[1].nType = pCtx->outputType; + pCtx->param[1].nType = pCtx->resDataInfo.type; DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts); } @@ -1853,7 +1853,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) { // do nothing at the first stage SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } @@ -1880,7 +1880,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6 } taosVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true); - size += pTagInfo->pTagCtxList[i]->outputBytes; + size += pTagInfo->pTagCtxList[i]->resDataInfo.bytes; } } } @@ -2101,9 +2101,9 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { for (int32_t i = 0; i < len; ++i, output += step) { int16_t offset = 0; for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { - memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); - offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; - pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes; + memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes); + offset += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes; + pData[j] += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes; } } @@ -2262,7 +2262,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { - int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType; + int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } @@ -2319,7 +2319,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) { // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { - int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType; + int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); } @@ -2469,7 +2469,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { tMemBucket * pMemBucket = ppInfo->pMemBucket; if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null assert(ppInfo->numOfElems == 0); - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else { SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v)); } @@ -2588,7 +2588,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { memcpy(pCtx->pOutput, res, sizeof(double)); free(res); } else { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } } else { @@ -2599,7 +2599,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { memcpy(pCtx->pOutput, res, sizeof(double)); free(res); } else { // no need to free - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } } @@ -2730,7 +2730,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); if (pInfo->num == 0) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } @@ -2794,16 +2794,16 @@ static void col_project_function(SQLFunctionCtx *pCtx) { static void tag_project_function(SQLFunctionCtx *pCtx) { INC_INIT_VAL(pCtx, pCtx->size); - assert(pCtx->inputBytes == pCtx->outputBytes); + assert(pCtx->inputBytes == pCtx->resDataInfo.bytes); - taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true); + taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true); char* data = pCtx->pOutput; - pCtx->pOutput += pCtx->outputBytes; + pCtx->pOutput += pCtx->resDataInfo.bytes; // directly copy from the first one for (int32_t i = 1; i < pCtx->size; ++i) { - memmove(pCtx->pOutput, data, pCtx->outputBytes); - pCtx->pOutput += pCtx->outputBytes; + memmove(pCtx->pOutput, data, pCtx->resDataInfo.bytes); + pCtx->pOutput += pCtx->resDataInfo.bytes; } } @@ -2821,7 +2821,7 @@ static void tag_function(SQLFunctionCtx *pCtx) { if (pCtx->currentStage == MERGE_STAGE) { copy_function(pCtx); } else { - taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true); + taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true); } } @@ -3396,7 +3396,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } @@ -3406,7 +3406,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (pInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); return; } @@ -3763,7 +3763,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) { *(TSKEY *)pCtx->pOutput = pCtx->startTs; } else if (type == TSDB_FILL_NULL) { - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); } else if (type == TSDB_FILL_SET_VALUE) { taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); } else { @@ -3772,13 +3772,13 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) { SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val); } else { - assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType); + assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->resDataInfo.bytes, pCtx->inputType); } } else if (type == TSDB_FILL_NEXT) { if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) { SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->end.val); } else { - assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->outputBytes, pCtx->inputType); + assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->resDataInfo.bytes, pCtx->inputType); } } else if (type == TSDB_FILL_LINEAR) { SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val}; @@ -3790,7 +3790,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } else { - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); + taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); } } else { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); @@ -3817,7 +3817,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { skey = ekey; } } - assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType); + assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType); } else if (type == TSDB_FILL_NEXT) { TSKEY ekey = skey; char* val = NULL; @@ -3837,7 +3837,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { val = (char*)pCtx->pInput; } - assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType); + assignVal(pCtx->pOutput, val, pCtx->resDataInfo.bytes, pCtx->inputType); } else if (type == TSDB_FILL_LINEAR) { if (pCtx->size <= 1) { return; @@ -3863,7 +3863,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) { if (isNull(start, srcType) || isNull(end, srcType)) { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); } else { - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, srcType); + taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType); } } else { setNull(pCtx->pOutput, srcType, pCtx->inputBytes); @@ -4382,7 +4382,7 @@ int32_t functionCompatList[] = { SAggFunctionInfo aggFunc[34] = {{ // 0, count function does not invoke the finalize function "count", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_COUNT, FUNCTION_COUNT, BASIC_FUNC_SO, @@ -4395,7 +4395,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 1 "sum", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_SUM, FUNCTION_SUM, BASIC_FUNC_SO, @@ -4408,7 +4408,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 2 "avg", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_AVG, FUNCTION_AVG, BASIC_FUNC_SO, @@ -4421,7 +4421,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 3 "min", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_MIN, FUNCTION_MIN, BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, @@ -4434,8 +4434,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 4 "max", - FUNCTION_AGG, - FUNCTION_MAX, + FUNCTION_TYPE_AGG, + FUNCTION_MAX, FUNCTION_MAX, BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, max_func_setup, @@ -4447,8 +4447,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 5 "stddev", - FUNCTION_AGG, - FUNCTION_STDDEV, + FUNCTION_TYPE_AGG, + FUNCTION_STDDEV, FUNCTION_STDDEV_DST, FUNCSTATE_SO | FUNCSTATE_STREAM, function_setup, @@ -4460,8 +4460,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 6 "percentile", - FUNCTION_AGG, - FUNCTION_PERCT, + FUNCTION_TYPE_AGG, + FUNCTION_PERCT, FUNCTION_INVALID_ID, FUNCSTATE_SO | FUNCSTATE_STREAM, percentile_function_setup, @@ -4473,8 +4473,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 7 "apercentile", - FUNCTION_AGG, - FUNCTION_APERCT, + FUNCTION_TYPE_AGG, + FUNCTION_APERCT, FUNCTION_APERCT, FUNCSTATE_SO | FUNCSTATE_STREAM | FUNCSTATE_STABLE, apercentile_function_setup, @@ -4486,8 +4486,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 8 "first", - FUNCTION_AGG, - FUNCTION_FIRST, + FUNCTION_TYPE_AGG, + FUNCTION_FIRST, FUNCTION_FIRST_DST, BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, function_setup, @@ -4499,8 +4499,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 9 "last", - FUNCTION_AGG, - FUNCTION_LAST, + FUNCTION_TYPE_AGG, + FUNCTION_LAST, FUNCTION_LAST_DST, BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, function_setup, @@ -4512,8 +4512,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 10 "last_row", - FUNCTION_AGG, - FUNCTION_LAST_ROW, + FUNCTION_TYPE_AGG, + FUNCTION_LAST_ROW, FUNCTION_LAST_ROW, FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, first_last_function_setup, @@ -4525,8 +4525,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 11 "top", - FUNCTION_AGG, - FUNCTION_TOP, + FUNCTION_TYPE_AGG, + FUNCTION_TOP, FUNCTION_TOP, FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, top_bottom_function_setup, @@ -4538,8 +4538,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 12 "bottom", - FUNCTION_AGG, - FUNCTION_BOTTOM, + FUNCTION_TYPE_AGG, + FUNCTION_BOTTOM, FUNCTION_BOTTOM, FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, top_bottom_function_setup, @@ -4551,8 +4551,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 13 "spread", - FUNCTION_AGG, - FUNCTION_SPREAD, + FUNCTION_TYPE_AGG, + FUNCTION_SPREAD, FUNCTION_SPREAD, BASIC_FUNC_SO, spread_function_setup, @@ -4564,8 +4564,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 14 "twa", - FUNCTION_AGG, - FUNCTION_TWA, + FUNCTION_TYPE_AGG, + FUNCTION_TWA, FUNCTION_TWA, BASIC_FUNC_SO | FUNCSTATE_NEED_TS, twa_function_setup, @@ -4577,8 +4577,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 15 "leastsquares", - FUNCTION_AGG, - FUNCTION_LEASTSQR, + FUNCTION_TYPE_AGG, + FUNCTION_LEASTSQR, FUNCTION_INVALID_ID, FUNCSTATE_SO | FUNCSTATE_STREAM, leastsquares_function_setup, @@ -4590,8 +4590,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 16 "ts", - FUNCTION_AGG, - FUNCTION_TS, + FUNCTION_TYPE_AGG, + FUNCTION_TS, FUNCTION_TS, BASIC_FUNC_SO | FUNCSTATE_NEED_TS, function_setup, @@ -4603,7 +4603,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 17 "ts", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_TS_DUMMY, FUNCTION_TS_DUMMY, BASIC_FUNC_SO | FUNCSTATE_NEED_TS, @@ -4616,7 +4616,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 18 "tag_dummy", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_TAG_DUMMY, FUNCTION_TAG_DUMMY, BASIC_FUNC_SO, @@ -4629,7 +4629,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 19 "ts", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_TS_COMP, FUNCTION_TS_COMP, FUNCSTATE_MO | FUNCSTATE_NEED_TS, @@ -4642,7 +4642,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 20 "tag", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_TAG, FUNCTION_TAG, BASIC_FUNC_SO, @@ -4655,7 +4655,7 @@ SAggFunctionInfo aggFunc[34] = {{ {//TODO this is a scala function // 21, column project sql function "colprj", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_PRJ, FUNCTION_PRJ, BASIC_FUNC_MO | FUNCSTATE_NEED_TS, @@ -4668,7 +4668,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 22, multi-output, tag function has only one result "tagprj", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_TAGPRJ, FUNCTION_TAGPRJ, BASIC_FUNC_MO, @@ -4681,7 +4681,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 23 "arithmetic", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_ARITHM, FUNCTION_ARITHM, FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS, @@ -4694,7 +4694,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 24 "diff", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_DIFF, FUNCTION_INVALID_ID, FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, @@ -4708,8 +4708,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 25 "first_dist", - FUNCTION_AGG, - FUNCTION_FIRST_DST, + FUNCTION_TYPE_AGG, + FUNCTION_FIRST_DST, FUNCTION_FIRST_DST, BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, first_last_function_setup, @@ -4721,7 +4721,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 26 "last_dist", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_LAST_DST, FUNCTION_LAST_DST, BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, @@ -4734,7 +4734,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 27 "stddev", // return table id and the corresponding tags for join match and subscribe - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_STDDEV_DST, FUNCTION_AVG, FUNCSTATE_SO | FUNCSTATE_STABLE, @@ -4747,7 +4747,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 28 "interp", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_INTERP, FUNCTION_INTERP, FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS , @@ -4760,7 +4760,7 @@ SAggFunctionInfo aggFunc[34] = {{ { // 29 "rate", - FUNCTION_AGG, + FUNCTION_TYPE_AGG, FUNCTION_RATE, FUNCTION_RATE, BASIC_FUNC_SO | FUNCSTATE_NEED_TS, @@ -4773,8 +4773,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 30 "irate", - FUNCTION_AGG, - FUNCTION_IRATE, + FUNCTION_TYPE_AGG, + FUNCTION_IRATE, FUNCTION_IRATE, BASIC_FUNC_SO | FUNCSTATE_NEED_TS, rate_function_setup, @@ -4786,8 +4786,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 31 "tbid", // return table id and the corresponding tags for join match and subscribe - FUNCTION_AGG, - FUNCTION_TID_TAG, + FUNCTION_TYPE_AGG, + FUNCTION_TID_TAG, FUNCTION_TID_TAG, FUNCSTATE_MO | FUNCSTATE_STABLE, function_setup, @@ -4798,8 +4798,8 @@ SAggFunctionInfo aggFunc[34] = {{ }, { //32 "derivative", // return table id and the corresponding tags for join match and subscribe - FUNCTION_AGG, - FUNCTION_DERIVATIVE, + FUNCTION_TYPE_AGG, + FUNCTION_DERIVATIVE, FUNCTION_INVALID_ID, FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, deriv_function_setup, @@ -4811,8 +4811,8 @@ SAggFunctionInfo aggFunc[34] = {{ { // 33 "_block_dist", // return table id and the corresponding tags for join match and subscribe - FUNCTION_AGG, - FUNCTION_BLKINFO, + FUNCTION_TYPE_AGG, + FUNCTION_BLKINFO, FUNCTION_BLKINFO, FUNCSTATE_SO | FUNCSTATE_STABLE, function_setup, diff --git a/source/libs/function/src/tfunction.c b/source/libs/function/src/tfunction.c index 0136bbf493..e9a59f7d69 100644 --- a/source/libs/function/src/tfunction.c +++ b/source/libs/function/src/tfunction.c @@ -6,6 +6,7 @@ #include "tscalarfunction.h" static SHashObj* functionHashTable = NULL; +static SHashObj* udfHashTable = NULL; static void doInitFunctionHashTable() { int numOfEntries = tListLen(aggFunc); @@ -23,6 +24,8 @@ static void doInitFunctionHashTable() { SScalarFunctionInfo* ptr = &scalarFunc[i]; taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES); } + + udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true); } static pthread_once_t functionHashTableInit = PTHREAD_ONCE_INIT; @@ -46,6 +49,27 @@ const char* qGetFunctionName(int32_t functionId) { } +SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) { + pthread_once(&functionHashTableInit, doInitFunctionHashTable); + + SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, name, len); + if (pInfo != NULL) { + return (*pInfo); + } else { + return NULL; + } +} + +void qAddUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) { + int32_t len = (uint32_t)strlen(pUdfInfo->name); + taosHashPut(udfHashTable, pUdfInfo->name, len, (void*)&pUdfInfo, POINTER_BYTES); +} + +void qRemoveUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) { + int32_t len = (uint32_t)strlen(pUdfInfo->name); + taosHashRemove(udfHashTable, pUdfInfo->name, len); +} + bool isTagsQuery(SArray* pFunctionIdList) { int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList); for (int32_t i = 0; i < num; ++i) { diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 5815edd701..095c5a6bb0 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1,6 +1,5 @@ #include "tudf.h" -#if 0 static char* getUdfFuncName(char* funcname, char* name, int type) { switch (type) { case TSDB_UDF_FUNC_NORMAL: @@ -26,6 +25,7 @@ static char* getUdfFuncName(char* funcname, char* name, int type) { return funcname; } +#if 0 int32_t initUdfInfo(SUdfInfo* pUdfInfo) { if (pUdfInfo == NULL) { return TSDB_CODE_SUCCESS; @@ -47,7 +47,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadScriptNormal; - if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + if (pUdfInfo->funcType == FUNCTION_TYPE_AGG) { pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize; pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadScriptMerge; } @@ -55,7 +55,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { } else { char path[PATH_MAX] = {0}; - taosGetTmpfilePath("script", path); + taosGetTmpfilePath("script", path, tsTempDir); FILE* file = fopen(path, "w+"); @@ -72,7 +72,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { return TSDB_CODE_QRY_SYS_ERROR; } - char funcname[TSDB_FUNCTIONS_NAME_MAX_LENGTH + 10] = {0}; + char funcname[FUNCTIONS_NAME_MAX_LENGTH + 10] = {0}; pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_NORMAL)); if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) { return TSDB_CODE_QRY_SYS_ERROR; @@ -80,7 +80,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) { pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_INIT)); - if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + if (pUdfInfo->funcType == FUNCTION_TYPE_AGG) { pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE)); pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_MERGE)); } @@ -121,4 +121,75 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) { tfree(pUdfInfo); } +void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) { + int32_t output = 0; + + if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) { + //qError("empty udf function, type:%d", type); + return; + } + +// //qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]); + + switch (type) { + case TSDB_UDF_FUNC_NORMAL: + if (pUdfInfo->isScript) { + (*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx, + (char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput, + (char *)pCtx->ptsOutputBuf, &output, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); + } else { + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + + void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); + + (*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, + pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes, &pUdfInfo->init); + } + + if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) { + pCtx->resultInfo->numOfRes = output; + } else { + pCtx->resultInfo->numOfRes += output; + } + + if (pCtx->resultInfo->numOfRes > 0) { + pCtx->resultInfo->hasResult = DATA_SET_FLAG; + } + + break; + + case TSDB_UDF_FUNC_MERGE: + if (pUdfInfo->isScript) { + (*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output); + } else { + (*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init); + } + + // set the output value exist + pCtx->resultInfo->numOfRes = output; + if (output > 0) { + pCtx->resultInfo->hasResult = DATA_SET_FLAG; + } + + break; + + case TSDB_UDF_FUNC_FINALIZE: { + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo); + if (pUdfInfo->isScript) { + (*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output); + } else { + (*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init); + } + // set the output value exist + pCtx->resultInfo->numOfRes = output; + if (output > 0) { + pCtx->resultInfo->hasResult = DATA_SET_FLAG; + } + + break; + } + } +} + #endif \ No newline at end of file