[td-10564]fix compiler error.

This commit is contained in:
Haojun Liao 2021-11-02 15:08:00 +08:00
parent f69a885da5
commit 89e324c899
6 changed files with 245 additions and 267 deletions

View File

@ -26,8 +26,8 @@ extern "C" {
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_SCALAR 1
#define FUNCTION_AGG 2
#define FUNCTION_TYPE_SCALAR 1
#define FUNCTION_TYPE_AGG 2
#define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 16
@ -108,8 +108,23 @@ typedef struct SExtTagsInfo {
struct SQLFunctionCtx **pTagCtxList;
} SExtTagsInfo;
typedef struct SResultDataInfo {
int16_t type;
int16_t bytes;
int32_t intermediateBytes;
} SResultDataInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
typedef struct SFunctionFpSet {
bool (*init)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*addInput)(struct SQLFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result.
void (*finalize)(struct SQLFunctionCtx *pCtx);
void (*combine)(struct SQLFunctionCtx *pCtx);
} SFunctionFpSet;
// sql function runtime context
typedef struct SQLFunctionCtx {
int32_t size; // number of rows
@ -118,9 +133,7 @@ typedef struct SQLFunctionCtx {
int16_t inputType;
int16_t inputBytes;
int16_t outputType;
int16_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
SResultDataInfo resDataInfo;
bool hasNull; // null value exist in current block
bool requireNull; // require null in some function
bool stableQuery;
@ -140,6 +153,8 @@ typedef struct SQLFunctionCtx {
SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
SFunctionFpSet* fpSet;
} SQLFunctionCtx;
enum {
@ -179,11 +194,11 @@ typedef struct SAggFunctionInfo {
uint16_t status;
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*exec)(SQLFunctionCtx *pCtx);
void (*addInput)(SQLFunctionCtx *pCtx);
// finalizer must be called after all exec has been executed to generated final result.
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
void (*finalize)(SQLFunctionCtx *pCtx);
void (*combine)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
@ -194,15 +209,9 @@ typedef struct SScalarFunctionInfo {
uint8_t functionId; // index of scalar function
bool (*init)(SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
void (*exec)(SQLFunctionCtx *pCtx);
void (*addInput)(SQLFunctionCtx *pCtx);
} SScalarFunctionInfo;
typedef struct SResultDataInfo {
int16_t type;
int16_t bytes;
int32_t intermediateBytes;
} SResultDataInfo;
typedef struct SMultiFunctionsDesc {
bool stableQuery;
bool groupbyColumn;
@ -280,6 +289,13 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api
struct SUdfInfo;
void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
#ifdef __cplusplus
}
#endif

View File

@ -318,10 +318,6 @@ do { \
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_UDF_TYPE_SCALAR 1
#define TSDB_UDF_TYPE_AGGREGATE 2
/*
* 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag

View File

@ -24,7 +24,6 @@
#include "function.h"
#include "tcompare.h"
#include "tcompression.h"
#include "tlosertree.h"
#include "ttypes.h"
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
@ -356,39 +355,6 @@ void* destroyOutputBuf(SSDataBlock* pBlock) {
return NULL;
}
//int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// bool hasMainFunction = hasMainOutput(pQueryAttr);
//
// int32_t maxOutput = 0;
// for (int32_t j = 0; j < numOfOutput; ++j) {
// int32_t id = pCtx[j].functionId;
//
// /*
// * ts, tag, tagprj function can not decide the output number of current query
// * the number of output result is decided by main output
// */
// if (hasMainFunction && (id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ)) {
// continue;
// }
//
// SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
// if (pResInfo != NULL && maxOutput < pResInfo->numOfRes) {
// maxOutput = pResInfo->numOfRes;
// }
// }
//
// assert(maxOutput >= 0);
// return maxOutput;
//}
//
//static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
// for (int32_t j = 0; j < numOfOutput; ++j) {
// SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
// pResInfo->numOfRes = 0;
// }
//}
static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
return true;
// bool hasTags = false;
@ -848,8 +814,6 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
pResultRowInfo->curPos = i + 1; // current not closed result object
}
}
//pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
}
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) {
@ -903,80 +867,6 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
return num;
}
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) {
#if 0
int32_t output = 0;
if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) {
//qError("empty udf function, type:%d", type);
return;
}
// //qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]);
switch (type) {
case TSDB_UDF_FUNC_NORMAL:
if (pUdfInfo->isScript) {
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes);
} else {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList,
pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
}
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pCtx->resultInfo->numOfRes = output;
} else {
pCtx->resultInfo->numOfRes += output;
}
if (pCtx->resultInfo->numOfRes > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
case TSDB_UDF_FUNC_MERGE:
if (pUdfInfo->isScript) {
(*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output);
} else {
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init);
}
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
case TSDB_UDF_FUNC_FINALIZE: {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output);
} else {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init);
}
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
}
}
#endif
}
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
@ -1004,14 +894,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
pCtx[k].isAggSet = false;
}
int32_t functionId = pCtx[k].functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
// if (functionId < 0) { // load the script and exec, pRuntimeEnv->pUdfInfo
// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
// } else {
// aAggs[functionId].xFunction(&pCtx[k]);
// }
pCtx[k].fpSet->addInput(&pCtx[k]);
}
// restore it
@ -1020,9 +904,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
}
static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext, SDataBlockInfo *pDataBlockInfo,
TSKEY *primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) {
static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) {
getNextTimeWindow(pQueryAttr, pNext);
// next time window is not in current block
@ -1244,14 +1127,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
pCtx[k].startTs = startTs;// this can be set during create the struct
int32_t functionId = pCtx[k].functionId;
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
// doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
// } else {
// aAggs[functionId].xFunction(&pCtx[k]);
// }
pCtx[k].fpSet->addInput(&pCtx[k]);
}
}
}
@ -1940,7 +1816,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
tagLen += pCtx[i].outputBytes;
tagLen += pCtx[i].resDataInfo.bytes;
pTagCtx[num++] = &pCtx[i];
} else if (1/*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/) {
p = &pCtx[i];
@ -1996,13 +1872,13 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->ptsOutputBuf = NULL;
pCtx->outputBytes = pSqlExpr->resSchema.bytes;
pCtx->outputType = pSqlExpr->resSchema.type;
pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes;
pCtx->resDataInfo.type = pSqlExpr->resSchema.type;
pCtx->order = pQueryAttr->order.order;
// pCtx->functionId = pSqlExpr->functionId;
pCtx->stableQuery = pQueryAttr->stableQuery;
pCtx->interBufBytes = pSqlExpr->interBytes;
pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
@ -3577,11 +3453,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
continue;
}
// if (pCtx[j].functionId < 0) { // todo udf initialization
// continue;
// } else {
// aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
// }
pCtx[j].fpSet->init(&pCtx[j], pCtx[j].resultInfo);
}
}
@ -3737,12 +3609,12 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
offset += pCtx[i].outputBytes;
offset += pCtx[i].resDataInfo.bytes;
continue;
}
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, bufPage, pResult->offset, offset);
offset += pCtx[i].outputBytes;
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId < 0) {
@ -3807,7 +3679,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset);
offset += pCtx[i].outputBytes;
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) {
@ -6881,7 +6753,6 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
}
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
STagScanInfo *pInfo = pOperator->info;

View File

@ -40,7 +40,7 @@
#define GET_TRUE_DATA_TYPE() \
int32_t type = 0; \
if (pCtx->currentStage == MERGE_STAGE) { \
type = pCtx->outputType; \
type = pCtx->resDataInfo.type; \
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \
} else { \
type = pCtx->inputType; \
@ -64,7 +64,7 @@
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
aggFunc[FUNCTION_TAG].exec(__ctx); \
aggFunc[FUNCTION_TAG].addInput(__ctx); \
} \
} while (0)
@ -72,7 +72,7 @@
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
aggFunc[FUNCTION_TAG].exec(__ctx); \
aggFunc[FUNCTION_TAG].addInput(__ctx); \
} \
} while (0);
@ -478,8 +478,8 @@ static bool function_setup(SQLFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
return false;
}
memset(pCtx->pOutput, 0, (size_t)pCtx->outputBytes);
initResultRowEntry(pResultInfo, pCtx->interBufBytes);
memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes);
initResultRowEntry(pResultInfo, pCtx->resDataInfo.intermediateBytes);
return true;
}
@ -493,7 +493,7 @@ static bool function_setup(SQLFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
static void function_finalizer(SQLFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
}
doFinalizer(pCtx);
@ -914,7 +914,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
@ -924,7 +924,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pAvgInfo->num == 0) { // all data are NULL or empty table
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
@ -1000,7 +1000,7 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
}
aggFunc[FUNCTION_TAG].exec(__ctx);
aggFunc[FUNCTION_TAG].addInput(__ctx);
}
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
@ -1248,7 +1248,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[j];
aggFunc[FUNCTION_TAG].exec(__ctx);
aggFunc[FUNCTION_TAG].addInput(__ctx);
}
notNullElems++;
@ -1304,7 +1304,7 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp
}
static void min_func_merge(SQLFunctionCtx *pCtx) {
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 1);
int32_t notNullElems = minmax_merge_impl(pCtx, pCtx->resDataInfo.bytes, pCtx->pOutput, 1);
SET_VAL(pCtx, notNullElems, 1);
@ -1315,7 +1315,7 @@ static void min_func_merge(SQLFunctionCtx *pCtx) {
}
static void max_func_merge(SQLFunctionCtx *pCtx) {
int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->outputBytes, pCtx->pOutput, 0);
int32_t numOfElem = minmax_merge_impl(pCtx, pCtx->resDataInfo.bytes, pCtx->pOutput, 0);
SET_VAL(pCtx, numOfElem, 1);
@ -1423,7 +1423,7 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pStd->num <= 0) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
} else {
double *retValue = (double *)pCtx->pOutput;
SET_DOUBLE_VAL(retValue, sqrt(pStd->res / pStd->num));
@ -1557,7 +1557,7 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) {
SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pStd->num <= 0) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
} else {
double *retValue = (double *)pCtx->pOutput;
SET_DOUBLE_VAL(retValue, sqrt(pStd->res / pStd->num));
@ -1665,16 +1665,16 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
assert(pCtx->stableQuery);
char * pData = GET_INPUT_DATA_LIST(pCtx);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->resDataInfo.bytes);
if (pInput->hasResult != DATA_SET_FLAG) {
return;
}
// The param[1] is used to keep the initial value of max ts value
if (pCtx->param[1].nType != pCtx->outputType || pCtx->param[1].i > pInput->ts) {
memcpy(pCtx->pOutput, pData, pCtx->outputBytes);
if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i > pInput->ts) {
memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes);
pCtx->param[1].i = pInput->ts;
pCtx->param[1].nType = pCtx->outputType;
pCtx->param[1].nType = pCtx->resDataInfo.type;
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
@ -1799,7 +1799,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
char *pData = GET_INPUT_DATA_LIST(pCtx);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->outputBytes);
SFirstLastInfo *pInput = (SFirstLastInfo*) (pData + pCtx->resDataInfo.bytes);
if (pInput->hasResult != DATA_SET_FLAG) {
return;
}
@ -1808,10 +1808,10 @@ static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
* param[1] used to keep the corresponding timestamp to decide if current result is
* the true last result
*/
if (pCtx->param[1].nType != pCtx->outputType || pCtx->param[1].i < pInput->ts) {
memcpy(pCtx->pOutput, pData, pCtx->outputBytes);
if (pCtx->param[1].nType != pCtx->resDataInfo.type || pCtx->param[1].i < pInput->ts) {
memcpy(pCtx->pOutput, pData, pCtx->resDataInfo.bytes);
pCtx->param[1].i = pInput->ts;
pCtx->param[1].nType = pCtx->outputType;
pCtx->param[1].nType = pCtx->resDataInfo.type;
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
@ -1853,7 +1853,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
// do nothing at the first stage
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
@ -1880,7 +1880,7 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
}
taosVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true);
size += pTagInfo->pTagCtxList[i]->outputBytes;
size += pTagInfo->pTagCtxList[i]->resDataInfo.bytes;
}
}
}
@ -2101,9 +2101,9 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
for (int32_t i = 0; i < len; ++i, output += step) {
int16_t offset = 0;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes);
offset += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->resDataInfo.bytes;
}
}
@ -2262,7 +2262,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType;
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type;
do_top_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp,
type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
@ -2319,7 +2319,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) {
// the intermediate result is binary, we only use the output data type
for (int32_t i = 0; i < pInput->num; ++i) {
int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->outputType;
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type;
do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type,
&pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
}
@ -2469,7 +2469,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
tMemBucket * pMemBucket = ppInfo->pMemBucket;
if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null
assert(ppInfo->numOfElems == 0);
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
} else {
SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v));
}
@ -2588,7 +2588,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
memcpy(pCtx->pOutput, res, sizeof(double));
free(res);
} else {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
} else {
@ -2599,7 +2599,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
memcpy(pCtx->pOutput, res, sizeof(double));
free(res);
} else { // no need to free
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
}
@ -2730,7 +2730,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->num == 0) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
@ -2794,16 +2794,16 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
static void tag_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size);
assert(pCtx->inputBytes == pCtx->outputBytes);
assert(pCtx->inputBytes == pCtx->resDataInfo.bytes);
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true);
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true);
char* data = pCtx->pOutput;
pCtx->pOutput += pCtx->outputBytes;
pCtx->pOutput += pCtx->resDataInfo.bytes;
// directly copy from the first one
for (int32_t i = 1; i < pCtx->size; ++i) {
memmove(pCtx->pOutput, data, pCtx->outputBytes);
pCtx->pOutput += pCtx->outputBytes;
memmove(pCtx->pOutput, data, pCtx->resDataInfo.bytes);
pCtx->pOutput += pCtx->resDataInfo.bytes;
}
}
@ -2821,7 +2821,7 @@ static void tag_function(SQLFunctionCtx *pCtx) {
if (pCtx->currentStage == MERGE_STAGE) {
copy_function(pCtx);
} else {
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true);
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true);
}
}
@ -3396,7 +3396,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
if (pResInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
@ -3406,7 +3406,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
return;
}
@ -3763,7 +3763,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) {
*(TSKEY *)pCtx->pOutput = pCtx->startTs;
} else if (type == TSDB_FILL_NULL) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
} else if (type == TSDB_FILL_SET_VALUE) {
taosVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
} else {
@ -3772,13 +3772,13 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->start.val);
} else {
assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
assignVal(pCtx->pOutput, pCtx->start.ptr, pCtx->resDataInfo.bytes, pCtx->inputType);
}
} else if (type == TSDB_FILL_NEXT) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->pOutput, pCtx->inputType, pCtx->end.val);
} else {
assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->outputBytes, pCtx->inputType);
assignVal(pCtx->pOutput, pCtx->end.ptr, pCtx->resDataInfo.bytes, pCtx->inputType);
}
} else if (type == TSDB_FILL_LINEAR) {
SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val};
@ -3790,7 +3790,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
}
} else {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
@ -3817,7 +3817,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
skey = ekey;
}
}
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->outputBytes, pCtx->inputType);
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType);
} else if (type == TSDB_FILL_NEXT) {
TSKEY ekey = skey;
char* val = NULL;
@ -3837,7 +3837,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
val = (char*)pCtx->pInput;
}
assignVal(pCtx->pOutput, val, pCtx->outputBytes, pCtx->inputType);
assignVal(pCtx->pOutput, val, pCtx->resDataInfo.bytes, pCtx->inputType);
} else if (type == TSDB_FILL_LINEAR) {
if (pCtx->size <= 1) {
return;
@ -3863,7 +3863,7 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
if (isNull(start, srcType) || isNull(end, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, srcType);
taosGetLinearInterpolationVal(&point, pCtx->resDataInfo.type, &point1, &point2, srcType);
}
} else {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
@ -4382,7 +4382,7 @@ int32_t functionCompatList[] = {
SAggFunctionInfo aggFunc[34] = {{
// 0, count function does not invoke the finalize function
"count",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_COUNT,
FUNCTION_COUNT,
BASIC_FUNC_SO,
@ -4395,7 +4395,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 1
"sum",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_SUM,
FUNCTION_SUM,
BASIC_FUNC_SO,
@ -4408,7 +4408,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 2
"avg",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_AVG,
FUNCTION_AVG,
BASIC_FUNC_SO,
@ -4421,7 +4421,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 3
"min",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_MIN,
FUNCTION_MIN,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
@ -4434,7 +4434,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 4
"max",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_MAX,
FUNCTION_MAX,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
@ -4447,7 +4447,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 5
"stddev",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_STDDEV,
FUNCTION_STDDEV_DST,
FUNCSTATE_SO | FUNCSTATE_STREAM,
@ -4460,7 +4460,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 6
"percentile",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_PERCT,
FUNCTION_INVALID_ID,
FUNCSTATE_SO | FUNCSTATE_STREAM,
@ -4473,7 +4473,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 7
"apercentile",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_APERCT,
FUNCTION_APERCT,
FUNCSTATE_SO | FUNCSTATE_STREAM | FUNCSTATE_STABLE,
@ -4486,7 +4486,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 8
"first",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_FIRST,
FUNCTION_FIRST_DST,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
@ -4499,7 +4499,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 9
"last",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_LAST,
FUNCTION_LAST_DST,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
@ -4512,7 +4512,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 10
"last_row",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_LAST_ROW,
FUNCTION_LAST_ROW,
FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4525,7 +4525,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 11
"top",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TOP,
FUNCTION_TOP,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4538,7 +4538,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 12
"bottom",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_BOTTOM,
FUNCTION_BOTTOM,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4551,7 +4551,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 13
"spread",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_SPREAD,
FUNCTION_SPREAD,
BASIC_FUNC_SO,
@ -4564,7 +4564,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 14
"twa",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TWA,
FUNCTION_TWA,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
@ -4577,7 +4577,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 15
"leastsquares",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_LEASTSQR,
FUNCTION_INVALID_ID,
FUNCSTATE_SO | FUNCSTATE_STREAM,
@ -4590,7 +4590,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 16
"ts",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TS,
FUNCTION_TS,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
@ -4603,7 +4603,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 17
"ts",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TS_DUMMY,
FUNCTION_TS_DUMMY,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
@ -4616,7 +4616,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 18
"tag_dummy",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TAG_DUMMY,
FUNCTION_TAG_DUMMY,
BASIC_FUNC_SO,
@ -4629,7 +4629,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 19
"ts",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TS_COMP,
FUNCTION_TS_COMP,
FUNCSTATE_MO | FUNCSTATE_NEED_TS,
@ -4642,7 +4642,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 20
"tag",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TAG,
FUNCTION_TAG,
BASIC_FUNC_SO,
@ -4655,7 +4655,7 @@ SAggFunctionInfo aggFunc[34] = {{
{//TODO this is a scala function
// 21, column project sql function
"colprj",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_PRJ,
FUNCTION_PRJ,
BASIC_FUNC_MO | FUNCSTATE_NEED_TS,
@ -4668,7 +4668,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 22, multi-output, tag function has only one result
"tagprj",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TAGPRJ,
FUNCTION_TAGPRJ,
BASIC_FUNC_MO,
@ -4681,7 +4681,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 23
"arithmetic",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_ARITHM,
FUNCTION_ARITHM,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS,
@ -4694,7 +4694,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 24
"diff",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_DIFF,
FUNCTION_INVALID_ID,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4708,7 +4708,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 25
"first_dist",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_FIRST_DST,
FUNCTION_FIRST_DST,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4721,7 +4721,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 26
"last_dist",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_LAST_DST,
FUNCTION_LAST_DST,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4734,7 +4734,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 27
"stddev", // return table id and the corresponding tags for join match and subscribe
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_STDDEV_DST,
FUNCTION_AVG,
FUNCSTATE_SO | FUNCSTATE_STABLE,
@ -4747,7 +4747,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 28
"interp",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_INTERP,
FUNCTION_INTERP,
FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS ,
@ -4760,7 +4760,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 29
"rate",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_RATE,
FUNCTION_RATE,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
@ -4773,7 +4773,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 30
"irate",
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_IRATE,
FUNCTION_IRATE,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
@ -4786,7 +4786,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 31
"tbid", // return table id and the corresponding tags for join match and subscribe
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_TID_TAG,
FUNCTION_TID_TAG,
FUNCSTATE_MO | FUNCSTATE_STABLE,
@ -4798,7 +4798,7 @@ SAggFunctionInfo aggFunc[34] = {{
},
{ //32
"derivative", // return table id and the corresponding tags for join match and subscribe
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_DERIVATIVE,
FUNCTION_INVALID_ID,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
@ -4811,7 +4811,7 @@ SAggFunctionInfo aggFunc[34] = {{
{
// 33
"_block_dist", // return table id and the corresponding tags for join match and subscribe
FUNCTION_AGG,
FUNCTION_TYPE_AGG,
FUNCTION_BLKINFO,
FUNCTION_BLKINFO,
FUNCSTATE_SO | FUNCSTATE_STABLE,

View File

@ -6,6 +6,7 @@
#include "tscalarfunction.h"
static SHashObj* functionHashTable = NULL;
static SHashObj* udfHashTable = NULL;
static void doInitFunctionHashTable() {
int numOfEntries = tListLen(aggFunc);
@ -23,6 +24,8 @@ static void doInitFunctionHashTable() {
SScalarFunctionInfo* ptr = &scalarFunc[i];
taosHashPut(functionHashTable, scalarFunc[i].name, len, (void*)&ptr, POINTER_BYTES);
}
udfHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, true);
}
static pthread_once_t functionHashTableInit = PTHREAD_ONCE_INIT;
@ -46,6 +49,27 @@ const char* qGetFunctionName(int32_t functionId) {
}
SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) {
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
SAggFunctionInfo** pInfo = taosHashGet(functionHashTable, name, len);
if (pInfo != NULL) {
return (*pInfo);
} else {
return NULL;
}
}
void qAddUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
int32_t len = (uint32_t)strlen(pUdfInfo->name);
taosHashPut(udfHashTable, pUdfInfo->name, len, (void*)&pUdfInfo, POINTER_BYTES);
}
void qRemoveUdfInfo(uint64_t id, SUdfInfo* pUdfInfo) {
int32_t len = (uint32_t)strlen(pUdfInfo->name);
taosHashRemove(udfHashTable, pUdfInfo->name, len);
}
bool isTagsQuery(SArray* pFunctionIdList) {
int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList);
for (int32_t i = 0; i < num; ++i) {

View File

@ -1,6 +1,5 @@
#include "tudf.h"
#if 0
static char* getUdfFuncName(char* funcname, char* name, int type) {
switch (type) {
case TSDB_UDF_FUNC_NORMAL:
@ -26,6 +25,7 @@ static char* getUdfFuncName(char* funcname, char* name, int type) {
return funcname;
}
#if 0
int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) {
return TSDB_CODE_SUCCESS;
@ -47,7 +47,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadScriptNormal;
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
if (pUdfInfo->funcType == FUNCTION_TYPE_AGG) {
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize;
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadScriptMerge;
}
@ -55,7 +55,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
} else {
char path[PATH_MAX] = {0};
taosGetTmpfilePath("script", path);
taosGetTmpfilePath("script", path, tsTempDir);
FILE* file = fopen(path, "w+");
@ -72,7 +72,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
return TSDB_CODE_QRY_SYS_ERROR;
}
char funcname[TSDB_FUNCTIONS_NAME_MAX_LENGTH + 10] = {0};
char funcname[FUNCTIONS_NAME_MAX_LENGTH + 10] = {0};
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_NORMAL));
if (NULL == pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) {
return TSDB_CODE_QRY_SYS_ERROR;
@ -80,7 +80,7 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_INIT));
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
if (pUdfInfo->funcType == FUNCTION_TYPE_AGG) {
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_FINALIZE));
pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE] = taosLoadSym(pUdfInfo->handle, getUdfFuncName(funcname, pUdfInfo->name, TSDB_UDF_FUNC_MERGE));
}
@ -121,4 +121,75 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) {
tfree(pUdfInfo);
}
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) {
int32_t output = 0;
if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) {
//qError("empty udf function, type:%d", type);
return;
}
// //qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]);
switch (type) {
case TSDB_UDF_FUNC_NORMAL:
if (pUdfInfo->isScript) {
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->startTs, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
} else {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList,
pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes, &pUdfInfo->init);
}
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pCtx->resultInfo->numOfRes = output;
} else {
pCtx->resultInfo->numOfRes += output;
}
if (pCtx->resultInfo->numOfRes > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
case TSDB_UDF_FUNC_MERGE:
if (pUdfInfo->isScript) {
(*(scriptMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pUdfInfo->pScriptCtx, pCtx->pInput, pCtx->size, pCtx->pOutput, &output);
} else {
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init);
}
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
case TSDB_UDF_FUNC_FINALIZE: {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->startTs, pCtx->pOutput, &output);
} else {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init);
}
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
}
}
}
#endif