[td-3188]refactor.
This commit is contained in:
parent
778270f435
commit
18cb7b36fc
|
@ -280,16 +280,6 @@ typedef struct SResRec {
|
|||
int numOfTotal;
|
||||
} SResRec;
|
||||
|
||||
typedef struct SUdfInfo {
|
||||
int32_t functionId; // system assigned function id
|
||||
char *name; // function name
|
||||
int16_t resType; // result type
|
||||
int16_t resBytes; // result byte
|
||||
int32_t funcType; // scalar function or aggregate function
|
||||
int32_t contLen; // content length
|
||||
char *content; // binary content
|
||||
} SUdfInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfRows; // num of results in current retrieval
|
||||
int64_t numOfRowsGroup; // num of results of current group
|
||||
|
|
|
@ -1075,7 +1075,7 @@ static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool n
|
|||
pCtx->currentStage = MERGE_STAGE;
|
||||
|
||||
if (needInit) {
|
||||
aAggs[pCtx->functionId].init(pCtx);
|
||||
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -322,8 +322,6 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
|
||||
|
||||
//TODO CHECK CODE
|
||||
|
||||
|
||||
if (len + sizeof(SCreateFuncMsg) > pSql->cmd.allocSize) {
|
||||
ret = tscAllocPayload(&pSql->cmd, len + sizeof(SCreateFuncMsg));
|
||||
if (ret) {
|
||||
|
|
|
@ -1063,12 +1063,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
|
||||
// support only one udf
|
||||
if (pCmd->pUdfInfo != NULL) {
|
||||
assert(taosArrayGetSize(pCmd->pUdfInfo) == 1);
|
||||
|
||||
if (pCmd->pUdfInfo != NULL && taosArrayGetSize(pCmd->pUdfInfo) > 0) {
|
||||
pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload));
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pCmd->pUdfInfo); ++i) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, i);
|
||||
*(int8_t*) pMsg = pUdfInfo->resType;
|
||||
pMsg += sizeof(pUdfInfo->resType);
|
||||
|
||||
*(int16_t*) pMsg = htons(pUdfInfo->resBytes);
|
||||
pMsg += sizeof(pUdfInfo->resBytes);
|
||||
|
||||
STR_TO_VARSTR(pMsg, pUdfInfo->name);
|
||||
|
||||
pMsg += varDataTLen(pMsg);
|
||||
|
@ -2115,25 +2119,41 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
|
|||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SUdfFuncMsg* pFuncMsg = (SUdfFuncMsg *)pSql->res.pRsp;
|
||||
pFuncMsg->num = htonl(pFuncMsg->num);
|
||||
assert(pFuncMsg->num == taosArrayGetSize(pCmd->pUdfInfo));
|
||||
|
||||
char* pMsg = pFuncMsg->content;
|
||||
for(int32_t i = 0; i < pFuncMsg->num; ++i) {
|
||||
SFunctionInfoMsg* pFunc = (SFunctionInfoMsg*) pMsg;
|
||||
|
||||
SUdfInfo info = {0};
|
||||
info.name = strndup(pFunc->name, TSDB_FUNC_NAME_LEN);
|
||||
info.resBytes = htons(pFunc->resBytes);
|
||||
info.resType = htons(pFunc->resType);
|
||||
info.funcType = TSDB_UDF_TYPE_SCALAR;
|
||||
for(int32_t j = 0; j < pFuncMsg->num; ++j) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, j);
|
||||
if (strcmp(pUdfInfo->name, pFunc->name) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
info.contLen = htonl(pFunc->len);
|
||||
info.content = malloc(pFunc->len);
|
||||
memcpy(info.content, pFunc->content, info.contLen);
|
||||
pUdfInfo->resBytes = htons(pFunc->resBytes);
|
||||
pUdfInfo->resType = pFunc->resType;
|
||||
pUdfInfo->funcType = TSDB_UDF_TYPE_SCALAR;
|
||||
pUdfInfo->contLen = htonl(pFunc->len);
|
||||
|
||||
taosArrayPush(pCmd->pUdfInfo, &info);
|
||||
pMsg += sizeof(SFunctionInfoMsg) + info.contLen;
|
||||
pUdfInfo->content = malloc(pUdfInfo->contLen);
|
||||
memcpy(pUdfInfo->content, pFunc->content, pUdfInfo->contLen);
|
||||
|
||||
pMsg += sizeof(SFunctionInfoMsg) + pUdfInfo->contLen;
|
||||
}
|
||||
}
|
||||
|
||||
// master sqlObj locates in param
|
||||
SSqlObj* parent = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param);
|
||||
if(parent == NULL) {
|
||||
return pSql->res.code;
|
||||
}
|
||||
|
||||
assert(parent->signature == parent && (int64_t)pSql->param == parent->self);
|
||||
taosArrayDestroy(parent->cmd.pUdfInfo);
|
||||
|
||||
parent->cmd.pUdfInfo = pCmd->pUdfInfo; // assigned to parent sql obj.
|
||||
pCmd->pUdfInfo = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -593,7 +593,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char name[TSDB_FUNC_NAME_LEN];
|
||||
int16_t resType;
|
||||
int8_t resType;
|
||||
int16_t resBytes;
|
||||
int32_t len;
|
||||
char content[];
|
||||
|
|
|
@ -447,7 +447,8 @@ static int32_t mnodeProcessRetrieveFuncImplMsg(SMnodeMsg *pMsg) {
|
|||
pFuncInfo->len = htonl(pFuncObj->contLen);
|
||||
memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen);
|
||||
|
||||
pFuncInfo->resType = htons(pFuncObj->resType);
|
||||
pFuncInfo->resType = pFuncObj->resType;
|
||||
pFuncInfo->resBytes = htons(pFuncObj->resBytes);
|
||||
pOutput += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
|
||||
}
|
||||
|
||||
|
|
|
@ -205,7 +205,7 @@ typedef struct SAggFunctionInfo {
|
|||
int8_t stableFuncId; // transfer function for super table query
|
||||
uint16_t status;
|
||||
|
||||
bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment
|
||||
bool (*init)(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultCellInfo); // setup the execute environment
|
||||
|
||||
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
|
||||
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
|
||||
|
@ -269,9 +269,9 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha
|
|||
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, int32_t bufLen) {
|
||||
pResInfo->initialized = true; // the this struct has been initialized flag
|
||||
|
||||
pResInfo->complete = false;
|
||||
pResInfo->complete = false;
|
||||
pResInfo->hasResult = false;
|
||||
pResInfo->numOfRes = 0;
|
||||
pResInfo->numOfRes = 0;
|
||||
|
||||
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "tarray.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tsdb.h"
|
||||
#include "qUdf.h"
|
||||
|
||||
struct SColumnFilterElem;
|
||||
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
|
||||
|
@ -339,6 +340,7 @@ typedef struct SQueryParam {
|
|||
SColIndex *pGroupColIndex;
|
||||
SColumnInfo *pTagColumnInfo;
|
||||
SSqlGroupbyExpr *pGroupbyExpr;
|
||||
SUdfInfo *pUdfInfo;
|
||||
} SQueryParam;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
|
@ -424,7 +426,7 @@ typedef struct SSWindowOperatorInfo {
|
|||
void freeParam(SQueryParam *param);
|
||||
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
|
||||
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
|
||||
SColumnInfo* pTagCols);
|
||||
SColumnInfo* pTagCols, SUdfInfo* pUdfInfo);
|
||||
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
|
||||
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr);
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_QUDF_H
|
||||
#define TDENGINE_QUDF_H
|
||||
|
||||
typedef struct SUdfInfo {
|
||||
int32_t functionId; // system assigned function id
|
||||
int8_t funcType; // scalar function or aggregate function
|
||||
int8_t resType; // result type
|
||||
int16_t resBytes; // result byte
|
||||
int32_t contLen; // content length
|
||||
char *name; // function name
|
||||
union { // file path or [in memory] binary content
|
||||
char *content;
|
||||
char *path;
|
||||
};
|
||||
} SUdfInfo;
|
||||
|
||||
#endif // TDENGINE_QUDF_H
|
|
@ -168,6 +168,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
qError("Illegal data type %d or data type length %d", dataType, dataBytes);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG_DUMMY ||
|
||||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ ||
|
||||
|
@ -381,14 +385,14 @@ static void no_next_step(SQLFunctionCtx *pCtx) {
|
|||
pResInfo->complete = true;
|
||||
}
|
||||
|
||||
static bool function_setup(SQLFunctionCtx *pCtx) {
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResInfo->initialized) {
|
||||
static bool function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
|
||||
// SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResultInfo->initialized) {
|
||||
return false;
|
||||
}
|
||||
|
||||
memset(pCtx->pOutput, 0, (size_t)pCtx->outputBytes);
|
||||
initResultInfo(pResInfo, pCtx->interBufBytes);
|
||||
initResultInfo(pResultInfo, pCtx->interBufBytes);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1088,8 +1092,8 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
|
|||
}
|
||||
}
|
||||
|
||||
static bool min_func_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool min_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
|
||||
if (!function_setup(pCtx, pResultInfo)) {
|
||||
return false; // not initialized since it has been initialized
|
||||
}
|
||||
|
||||
|
@ -1133,8 +1137,8 @@ static bool min_func_setup(SQLFunctionCtx *pCtx) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool max_func_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool max_func_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
|
||||
if (!function_setup(pCtx, pResultInfo)) {
|
||||
return false; // not initialized since it has been initialized
|
||||
}
|
||||
|
||||
|
@ -1809,8 +1813,8 @@ static void stddev_dst_finalizer(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////
|
||||
static bool first_last_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2555,14 +2559,13 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool top_bottom_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
STopBotInfo *pInfo = getTopBotOutputInfo(pCtx);
|
||||
buildTopBotStruct(pInfo, pCtx);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2749,14 +2752,13 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////
|
||||
static bool percentile_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
|
||||
if (!function_setup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// in the first round, get the min-max value of all involved data
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
|
||||
SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
|
||||
pInfo->numOfElems = 0;
|
||||
|
@ -2945,8 +2947,8 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
|
|||
return pInfo;
|
||||
}
|
||||
|
||||
static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
|
||||
if (!function_setup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -3073,12 +3075,11 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool leastsquares_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
// 2*3 matrix
|
||||
|
@ -3378,8 +3379,8 @@ enum {
|
|||
INITIAL_VALUE_NOT_ASSIGNED = 0,
|
||||
};
|
||||
|
||||
static bool diff_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (function_setup(pCtx)) {
|
||||
static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -3727,12 +3728,12 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
static bool spread_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool spread_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
|
@ -3929,12 +3930,10 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
|
|||
* param[2]: end time
|
||||
* @param pCtx
|
||||
*/
|
||||
static bool twa_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool twa_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pInfo->p.key = INT64_MIN;
|
||||
|
@ -4326,14 +4325,12 @@ static void interp_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool ts_comp_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false; // not initialized since it has been initialized
|
||||
}
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
STSCompInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
pInfo->pTSBuf = tsBufCreate(false, pCtx->order);
|
||||
pInfo->pTSBuf->tsOrder = pCtx->order;
|
||||
return true;
|
||||
|
@ -4435,13 +4432,12 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
|
|||
return resultVal;
|
||||
}
|
||||
|
||||
static bool rate_function_setup(SQLFunctionCtx *pCtx) {
|
||||
if (!function_setup(pCtx)) {
|
||||
static bool rate_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
|
||||
if (!function_setup(pCtx, pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->pOutput + pCtx->outputBytes;
|
||||
SRateInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SRateInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
pInfo->CorrectionValue = 0;
|
||||
pInfo->firstKey = INT64_MIN;
|
||||
|
|
|
@ -35,13 +35,6 @@
|
|||
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
#define CHECK_IF_QUERY_KILLED(_q) \
|
||||
do { \
|
||||
if (isQueryKilled((_q)->qinfo)) { \
|
||||
longjmp((_q)->env, TSDB_CODE_TSC_QUERY_CANCELLED); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||
|
||||
#define TIME_WINDOW_COPY(_dst, _src) do {\
|
||||
|
@ -116,6 +109,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
|
|||
if (pQuery->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
key /= 1000;
|
||||
}
|
||||
|
||||
if (pQuery->interval.intervalUnit == 'y') {
|
||||
interval *= 12;
|
||||
}
|
||||
|
@ -3066,7 +3060,11 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
|
|||
continue;
|
||||
}
|
||||
|
||||
aAggs[pCtx[j].functionId].init(&pCtx[j]);
|
||||
if (pCtx[j].functionId < 0) { // udf initialize
|
||||
|
||||
} else {
|
||||
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3205,7 +3203,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
|
|||
}
|
||||
|
||||
if (!pResInfo->initialized) {
|
||||
aAggs[functionId].init(&pCtx[i]);
|
||||
aAggs[functionId].init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5482,6 +5480,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen);
|
||||
pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap);
|
||||
pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId);
|
||||
pQueryMsg->udfContentOffset = htonl(pQueryMsg->udfContentOffset);
|
||||
pQueryMsg->udfContentLen = htonl(pQueryMsg->udfContentLen);
|
||||
pQueryMsg->udfNum = htonl(pQueryMsg->udfNum);
|
||||
|
||||
// query msg safety check
|
||||
if (!validateQueryMsg(pQueryMsg)) {
|
||||
|
@ -5728,6 +5729,27 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
|
|||
pMsg = (char *)pQueryMsg + pQueryMsg->tsOffset + pQueryMsg->tsLen;
|
||||
}
|
||||
|
||||
if (pQueryMsg->udfContentLen > 0) {
|
||||
param->pUdfInfo = calloc(1, sizeof(SUdfInfo));
|
||||
param->pUdfInfo->contLen = pQueryMsg->udfContentLen;
|
||||
|
||||
pMsg = (char*)pQueryMsg + pQueryMsg->udfContentOffset;
|
||||
param->pUdfInfo->resType = *(int8_t*) pMsg;
|
||||
pMsg += sizeof(int8_t);
|
||||
|
||||
param->pUdfInfo->resBytes = htons(*(int16_t*)pMsg);
|
||||
pMsg += sizeof(int16_t);
|
||||
|
||||
tstr* name = (tstr*)(pMsg);
|
||||
param->pUdfInfo->name = strndup(name->data, name->len);
|
||||
|
||||
pMsg += varDataTLen(name);
|
||||
param->pUdfInfo->content = malloc(pQueryMsg->udfContentLen);
|
||||
memcpy(param->pUdfInfo->content, pMsg, pQueryMsg->udfContentLen);
|
||||
|
||||
pMsg += pQueryMsg->udfContentLen;
|
||||
}
|
||||
|
||||
param->sql = strndup(pMsg, pQueryMsg->sqlstrLen);
|
||||
|
||||
if (!validateQuerySourceCols(pQueryMsg, param->pExprMsg, param->pTagColumnInfo)) {
|
||||
|
@ -5790,12 +5812,34 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC int32_t flushUdfContentToDisk(SUdfInfo* pUdfInfo) {
|
||||
if (pUdfInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char path[PATH_MAX] = {0};
|
||||
taosGetTmpfilePath("script", path);
|
||||
|
||||
FILE* file = fopen(path, "w+");
|
||||
|
||||
// TODO check for failure of flush to disk
|
||||
/*size_t t = */ fwrite(pUdfInfo->content, pUdfInfo->contLen, 1, file);
|
||||
fclose(file);
|
||||
|
||||
tfree(pUdfInfo->content);
|
||||
pUdfInfo->path = strdup(path);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// TODO tag length should be passed from client
|
||||
int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
|
||||
SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) {
|
||||
SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, SUdfInfo* pUdfInfo) {
|
||||
*pExprInfo = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// save the udf script or so file
|
||||
// flushUdfContentToDisk(pUdfInfo);
|
||||
|
||||
SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo));
|
||||
if (pExprs == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
|
@ -5871,10 +5915,15 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu
|
|||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
|
||||
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
if (pExprs[i].base.functionId < 0) {
|
||||
pExprs[i].type = pUdfInfo->resType;
|
||||
pExprs[i].bytes = pUdfInfo->resBytes;
|
||||
} else {
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
|
||||
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
}
|
||||
|
||||
if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
|
|
|
@ -91,7 +91,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
goto _over;
|
||||
}
|
||||
|
||||
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExprMsg, param.pTagColumnInfo, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue