[td-13039] support pseudo column in interval query.
This commit is contained in:
parent
dba4a72dac
commit
39036ea512
|
@ -133,7 +133,8 @@ static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData,
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
|
||||
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_BIGINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UBIGINT);
|
||||
int32_t type = pColumnInfoData->info.type;
|
||||
ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
|
||||
*(int64_t*)p = *(int64_t*)v;
|
||||
}
|
||||
|
@ -175,18 +176,17 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock);
|
|||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
||||
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
|
||||
|
||||
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
|
||||
|
||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||
|
||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||
|
||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||
|
||||
void blockDebugShowData(const SArray* dataBlocks);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -41,6 +41,7 @@ typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
|
|||
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
typedef struct SScalarFuncExecFuncs {
|
||||
FExecGetEnv getEnv;
|
||||
FScalarExecProcess process;
|
||||
} SScalarFuncExecFuncs;
|
||||
|
||||
|
@ -241,7 +242,6 @@ typedef struct tExprNode {
|
|||
};
|
||||
} tExprNode;
|
||||
|
||||
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
|
||||
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
|
||||
|
||||
typedef struct SAggFunctionInfo {
|
||||
|
@ -267,28 +267,6 @@ struct SScalarParam {
|
|||
int32_t numOfRows;
|
||||
};
|
||||
|
||||
typedef struct SMultiFunctionsDesc {
|
||||
bool stableQuery;
|
||||
bool groupbyColumn;
|
||||
bool agg;
|
||||
bool arithmeticOnAgg;
|
||||
bool projectionQuery;
|
||||
bool hasFilter;
|
||||
bool onlyTagQuery;
|
||||
bool orderProjectQuery;
|
||||
bool globalMerge;
|
||||
bool multigroupResult;
|
||||
bool blockDistribution;
|
||||
bool stateWindow;
|
||||
bool timewindow;
|
||||
bool sessionWindow;
|
||||
bool topbotQuery;
|
||||
bool interpQuery;
|
||||
bool distinct;
|
||||
bool join;
|
||||
bool continueQuery;
|
||||
} SMultiFunctionsDesc;
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
|
||||
bool isSuperTable);
|
||||
|
||||
|
@ -296,8 +274,6 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct
|
|||
|
||||
tExprNode* exprTreeFromBinary(const void* data, size_t size);
|
||||
|
||||
void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc);
|
||||
|
||||
tExprNode* exprdup(tExprNode* pTree);
|
||||
|
||||
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
|
||||
|
|
|
@ -58,6 +58,14 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
|||
int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
||||
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -331,7 +331,6 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
ASSERT(pColInfoData->nullbitmap == NULL);
|
||||
pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
|
||||
pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
|
||||
return 0;
|
||||
|
@ -609,22 +608,6 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
|
|||
return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t);
|
||||
}
|
||||
|
||||
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) {
|
||||
SSchema* pSchema = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(SSchema));
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
pSchema[i].bytes = pColInfoData->info.bytes;
|
||||
pSchema[i].type = pColInfoData->info.type;
|
||||
pSchema[i].colId = pColInfoData->info.colId;
|
||||
}
|
||||
|
||||
if (numOfCols != NULL) {
|
||||
*numOfCols = pBlock->info.numOfCols;
|
||||
}
|
||||
|
||||
return pSchema;
|
||||
}
|
||||
|
||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||
ASSERT(pBlock != NULL);
|
||||
double rowSize = 0;
|
||||
|
|
|
@ -483,6 +483,7 @@ typedef struct STableIntervalOperatorInfo {
|
|||
int32_t order; // current SSDataBlock scan order
|
||||
OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||
} STableIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
|
|
|
@ -1014,8 +1014,35 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* p
|
|||
return num;
|
||||
}
|
||||
|
||||
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
// query_range_start, query_range_end, window_duration, window_start, window_end
|
||||
static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
|
||||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pColData->info.bytes = sizeof(int64_t);
|
||||
|
||||
blockDataEnsureColumnCapacity(pColData, 5);
|
||||
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
|
||||
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
|
||||
|
||||
int64_t interval = 0;
|
||||
colDataAppendInt64(pColData, 2, &interval); // this value may be variable in case of 'n' and 'y'.
|
||||
colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
|
||||
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
|
||||
}
|
||||
|
||||
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) {
|
||||
int64_t* ts = (int64_t*)pColData->pData;
|
||||
|
||||
int64_t duration = pWin->ekey - pWin->skey + 1;
|
||||
ts[2] = duration; // set the duration
|
||||
ts[3] = pWin->skey; // window start key
|
||||
ts[4] = pWin->ekey + 1; // window end key
|
||||
}
|
||||
|
||||
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function
|
||||
updateTimeWindowInfo(pTimeWindowData, pWin);
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].startTs = pWin->skey;
|
||||
|
||||
|
@ -1038,6 +1065,21 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of
|
|||
pCtx[k].isAggSet = false;
|
||||
}
|
||||
|
||||
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
|
||||
char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
|
||||
SScalarParam out = {.columnData = NULL};
|
||||
out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||
out.columnData->info.type = TSDB_DATA_TYPE_BIGINT;
|
||||
out.columnData->info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
|
||||
out.columnData->pData = p;
|
||||
pCtx[k].sfp.process(&intervalParam, 1, &out);
|
||||
pEntryInfo->numOfRes = 1;
|
||||
pEntryInfo->hasResult = ',';
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionNeedToExecute(&pCtx[k])) {
|
||||
pCtx[k].fpSet.process(&pCtx[k]);
|
||||
}
|
||||
|
@ -1489,8 +1531,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
|
|||
}
|
||||
}
|
||||
|
||||
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
||||
int32_t tableGroupId) {
|
||||
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
|
||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
|
@ -1563,7 +1604,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
|
||||
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
// restore current time window
|
||||
|
@ -1578,8 +1619,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
// window start key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
|
||||
pInfo->order, false);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput,
|
||||
TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
STimeWindow nextWin = win;
|
||||
while (1) {
|
||||
|
@ -1609,8 +1649,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
|
||||
pInfo->order, false);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput,
|
||||
TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
if (pInfo->timeWindowInterpo) {
|
||||
|
@ -1855,7 +1894,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t rowIndex = j - num;
|
||||
doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
// assign the group keys or user input constant values if required
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||
|
@ -1873,7 +1912,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t rowIndex = pBlock->info.rows - num;
|
||||
doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||
}
|
||||
}
|
||||
|
@ -1924,8 +1963,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
}
|
||||
|
||||
// pInfo->numOfRows data belong to the current session window
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput,
|
||||
TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
pInfo->curWindow.skey = tsList[j];
|
||||
pInfo->curWindow.ekey = tsList[j];
|
||||
|
@ -1945,8 +1983,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput,
|
||||
TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
||||
|
@ -2013,11 +2050,7 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (functionId == FUNCTION_TS) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (isRowEntryCompleted(pResInfo) || functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
||||
if (isRowEntryCompleted(pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2132,6 +2165,9 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
|
|||
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
|
||||
} else {
|
||||
fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
|
||||
if (pCtx->sfp.getEnv != NULL) {
|
||||
pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
|
||||
}
|
||||
}
|
||||
pCtx->resDataInfo.interBufSize = env.calcMemSize;
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
||||
|
@ -3730,7 +3766,6 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes
|
|||
|
||||
void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
|
||||
|
||||
|
@ -3738,6 +3773,11 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S
|
|||
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// int32_t functionId = pCtx[i].functionId;
|
||||
// if (functionId < 0) {
|
||||
// continue;
|
||||
|
@ -4082,8 +4122,7 @@ static void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDa
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t orderType =
|
||||
TSDB_ORDER_ASC; //(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
||||
int32_t orderType = TSDB_ORDER_ASC;
|
||||
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset);
|
||||
|
||||
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||
|
@ -7845,14 +7884,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->interval = *pInterval;
|
||||
|
||||
pInfo->execModel = OPTR_EXEC_MODEL_BATCH;
|
||||
|
||||
pInfo->win.skey = INT64_MIN;
|
||||
pInfo->win.skey = 0;
|
||||
pInfo->win.ekey = INT64_MAX;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
|
||||
|
||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
|
||||
goto _error;
|
||||
|
|
|
@ -315,31 +315,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_qstartts",
|
||||
.type = FUNCTION_TYPE_QSTARTTS,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = NULL,
|
||||
.getEnvFunc = getTimePseudoFuncEnv,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
.sprocessFunc = qStartTsFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "_qendts",
|
||||
.type = FUNCTION_TYPE_QENDTS,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = NULL,
|
||||
.getEnvFunc = getTimePseudoFuncEnv,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
.sprocessFunc = qEndTsFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "_wstartts",
|
||||
.type = FUNCTION_TYPE_QSTARTTS,
|
||||
.type = FUNCTION_TYPE_WSTARTTS,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = NULL,
|
||||
.getEnvFunc = getTimePseudoFuncEnv,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
.sprocessFunc = winStartTsFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
|
@ -347,9 +347,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.type = FUNCTION_TYPE_QENDTS,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = NULL,
|
||||
.getEnvFunc = getTimePseudoFuncEnv,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
.sprocessFunc = winEndTsFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
|
@ -357,9 +357,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.type = FUNCTION_TYPE_WDURATION,
|
||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = NULL,
|
||||
.getEnvFunc = getTimePseudoFuncEnv,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = NULL,
|
||||
.sprocessFunc = winDurFunction,
|
||||
.finalizeFunc = NULL
|
||||
}
|
||||
};
|
||||
|
@ -368,6 +368,7 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun
|
|||
|
||||
int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
||||
switch(pFunc->funcType) {
|
||||
case FUNCTION_TYPE_WDURATION:
|
||||
case FUNCTION_TYPE_COUNT:
|
||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
break;
|
||||
|
@ -400,14 +401,18 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
|||
}
|
||||
case FUNCTION_TYPE_CONCAT:
|
||||
case FUNCTION_TYPE_ROWTS:
|
||||
case FUNCTION_TYPE_TBNAME:
|
||||
case FUNCTION_TYPE_QSTARTTS:
|
||||
case FUNCTION_TYPE_QENDTS:
|
||||
case FUNCTION_TYPE_WSTARTTS:
|
||||
case FUNCTION_TYPE_WENDTS:
|
||||
case FUNCTION_TYPE_WDURATION:
|
||||
case FUNCTION_TYPE_TBNAME: {
|
||||
// todo
|
||||
break;
|
||||
}
|
||||
|
||||
case FUNCTION_TYPE_QENDTS:
|
||||
case FUNCTION_TYPE_QSTARTTS:
|
||||
case FUNCTION_TYPE_WENDTS:
|
||||
case FUNCTION_TYPE_WSTARTTS: {
|
||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||
break;
|
||||
}
|
||||
|
||||
case FUNCTION_TYPE_ABS:
|
||||
case FUNCTION_TYPE_CEIL:
|
||||
|
|
|
@ -92,6 +92,7 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
|
||||
pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -116,42 +116,6 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp
|
|||
return param->nodeFilterFn(pItem, pExpr->_node.info);
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
|
||||
tbufWriteUint8(bw, expr->nodeType);
|
||||
|
||||
if (expr->nodeType == TEXPR_VALUE_NODE) {
|
||||
SVariant* pVal = expr->pVal;
|
||||
|
||||
tbufWriteUint32(bw, pVal->nType);
|
||||
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
|
||||
tbufWriteInt32(bw, pVal->nLen);
|
||||
tbufWrite(bw, pVal->pz, pVal->nLen);
|
||||
} else {
|
||||
tbufWriteInt64(bw, pVal->i);
|
||||
}
|
||||
|
||||
} else if (expr->nodeType == TEXPR_COL_NODE) {
|
||||
SSchema* pSchema = expr->pSchema;
|
||||
tbufWriteInt16(bw, pSchema->colId);
|
||||
tbufWriteInt16(bw, pSchema->bytes);
|
||||
tbufWriteUint8(bw, pSchema->type);
|
||||
tbufWriteString(bw, pSchema->name);
|
||||
|
||||
} else if (expr->nodeType == TEXPR_BINARYEXPR_NODE) {
|
||||
tbufWriteUint8(bw, expr->_node.optr);
|
||||
exprTreeToBinaryImpl(bw, expr->_node.pLeft);
|
||||
exprTreeToBinaryImpl(bw, expr->_node.pRight);
|
||||
}
|
||||
}
|
||||
|
||||
void exprTreeToBinary(SBufferWriter* bw, tExprNode* expr) {
|
||||
if (expr != NULL) {
|
||||
exprTreeToBinaryImpl(bw, expr);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: these three functions should be made global
|
||||
static void* exception_calloc(size_t nmemb, size_t size) {
|
||||
void* p = taosMemoryCalloc(nmemb, size);
|
||||
|
@ -230,97 +194,6 @@ tExprNode* exprTreeFromBinary(const void* data, size_t size) {
|
|||
return exprTreeFromBinaryImpl(&br);
|
||||
}
|
||||
|
||||
tExprNode* exprTreeFromTableName(const char* tbnameCond) {
|
||||
if (!tbnameCond) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t anchor = CLEANUP_GET_ANCHOR();
|
||||
|
||||
tExprNode* expr = exception_calloc(1, sizeof(tExprNode));
|
||||
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
|
||||
|
||||
expr->nodeType = TEXPR_BINARYEXPR_NODE;
|
||||
|
||||
tExprNode* left = exception_calloc(1, sizeof(tExprNode));
|
||||
expr->_node.pLeft = left;
|
||||
|
||||
left->nodeType = TEXPR_COL_NODE;
|
||||
SSchema* pSchema = exception_calloc(1, sizeof(SSchema));
|
||||
left->pSchema = pSchema;
|
||||
|
||||
// *pSchema = NULL;//*tGetTbnameColumnSchema();
|
||||
|
||||
tExprNode* right = exception_calloc(1, sizeof(tExprNode));
|
||||
expr->_node.pRight = right;
|
||||
|
||||
if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) {
|
||||
right->nodeType = TEXPR_VALUE_NODE;
|
||||
expr->_node.optr = OP_TYPE_LIKE;
|
||||
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
|
||||
right->pVal = pVal;
|
||||
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1;
|
||||
pVal->pz = exception_malloc(len);
|
||||
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len);
|
||||
pVal->nType = TSDB_DATA_TYPE_BINARY;
|
||||
pVal->nLen = (int32_t)len;
|
||||
|
||||
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_MATCH, QUERY_COND_REL_PREFIX_MATCH_LEN) == 0) {
|
||||
right->nodeType = TEXPR_VALUE_NODE;
|
||||
expr->_node.optr = OP_TYPE_MATCH;
|
||||
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
|
||||
right->pVal = pVal;
|
||||
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN) + 1;
|
||||
pVal->pz = exception_malloc(len);
|
||||
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN, len);
|
||||
pVal->nType = TSDB_DATA_TYPE_BINARY;
|
||||
pVal->nLen = (int32_t)len;
|
||||
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_NMATCH, QUERY_COND_REL_PREFIX_NMATCH_LEN) == 0) {
|
||||
right->nodeType = TEXPR_VALUE_NODE;
|
||||
expr->_node.optr = OP_TYPE_NMATCH;
|
||||
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
|
||||
right->pVal = pVal;
|
||||
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN) + 1;
|
||||
pVal->pz = exception_malloc(len);
|
||||
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN, len);
|
||||
pVal->nType = TSDB_DATA_TYPE_BINARY;
|
||||
pVal->nLen = (int32_t)len;
|
||||
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) {
|
||||
right->nodeType = TEXPR_VALUE_NODE;
|
||||
expr->_node.optr = OP_TYPE_IN;
|
||||
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
|
||||
right->pVal = pVal;
|
||||
pVal->nType = TSDB_DATA_TYPE_POINTER_ARRAY;
|
||||
pVal->arr = taosArrayInit(2, POINTER_BYTES);
|
||||
|
||||
const char* cond = tbnameCond + QUERY_COND_REL_PREFIX_IN_LEN;
|
||||
for (const char *e = cond; *e != 0; e++) {
|
||||
if (*e == TS_PATH_DELIMITER[0]) {
|
||||
cond = e + 1;
|
||||
} else if (*e == ',') {
|
||||
size_t len = e - cond;
|
||||
char* p = exception_malloc(len + VARSTR_HEADER_SIZE);
|
||||
STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)len);
|
||||
cond += len;
|
||||
taosArrayPush(pVal->arr, &p);
|
||||
}
|
||||
}
|
||||
|
||||
if (*cond != 0) {
|
||||
size_t len = strlen(cond) + VARSTR_HEADER_SIZE;
|
||||
|
||||
char* p = exception_malloc(len);
|
||||
STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)(len - VARSTR_HEADER_SIZE));
|
||||
taosArrayPush(pVal->arr, &p);
|
||||
}
|
||||
|
||||
taosArraySortString(pVal->arr, taosArrayCompareString);
|
||||
}
|
||||
|
||||
CLEANUP_EXECUTE_TO(anchor, false);
|
||||
return expr;
|
||||
}
|
||||
|
||||
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
|
||||
SBufferReader br = tbufInitReader(buf, len, false);
|
||||
uint32_t type = tbufReadUint32(&br);
|
||||
|
|
|
@ -288,7 +288,7 @@ _return:
|
|||
SCL_RET(code);
|
||||
}
|
||||
|
||||
int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
||||
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
|
||||
if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
|
||||
sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
|
||||
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
|
@ -420,7 +420,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
|||
SFunctionNode *node = (SFunctionNode *)*pNode;
|
||||
SScalarParam output = {0};
|
||||
|
||||
ctx->code = sclExecFuncion(node, ctx, &output);
|
||||
ctx->code = sclExecFunction(node, ctx, &output);
|
||||
if (ctx->code) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
@ -547,7 +547,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
|
|||
SFunctionNode *node = (SFunctionNode *)pNode;
|
||||
SScalarParam output = {0};
|
||||
|
||||
ctx->code = sclExecFuncion(node, ctx, &output);
|
||||
ctx->code = sclExecFunction(node, ctx, &output);
|
||||
if (ctx->code) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
@ -667,7 +667,7 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
|
|||
|
||||
int32_t code = 0;
|
||||
SScalarCtx ctx = {0};
|
||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
if (NULL == ctx.pRes) {
|
||||
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
|
||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -689,7 +689,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
|||
|
||||
int32_t code = 0;
|
||||
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
|
||||
|
||||
// TODO: OPT performance
|
||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (NULL == ctx.pRes) {
|
||||
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
|
||||
|
@ -716,6 +716,3 @@ _return:
|
|||
sclFreeRes(ctx.pRes);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -377,3 +377,34 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(int64_t);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
ASSERT(inputNum == 1);
|
||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
|
||||
}
|
||||
|
||||
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
ASSERT(inputNum == 1);
|
||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
|
||||
}
|
||||
|
||||
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
ASSERT(inputNum == 1);
|
||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
|
||||
}
|
||||
|
||||
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
ASSERT(inputNum == 1);
|
||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
ASSERT(inputNum == 1);
|
||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
Loading…
Reference in New Issue