From 39036ea512f1b4c88a98ee74fc4ee52d8306aa76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Mar 2022 13:41:15 +0800 Subject: [PATCH] [td-13039] support pseudo column in interval query. --- include/common/tdatablock.h | 16 +-- include/libs/function/function.h | 26 +---- include/libs/scalar/scalar.h | 8 ++ source/common/src/tdatablock.c | 17 ---- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 93 ++++++++++++----- source/libs/function/src/builtins.c | 43 ++++---- source/libs/function/src/functionMgt.c | 1 + source/libs/function/src/texpr.c | 127 ------------------------ source/libs/scalar/src/scalar.c | 13 +-- source/libs/scalar/src/sclfunc.c | 31 ++++++ 11 files changed, 145 insertions(+), 231 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 4a47acfa50..4bbe42bc50 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -133,7 +133,8 @@ static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData, } static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_BIGINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UBIGINT); + int32_t type = pColumnInfoData->info.type; + ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; *(int64_t*)p = *(int64_t*)v; } @@ -175,17 +176,16 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); -SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); - int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); -int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void blockDataCleanup(SSDataBlock* pDataBlock); +int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); +void blockDataCleanup(SSDataBlock* pDataBlock); +size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); +void* blockDataDestroy(SSDataBlock* pBlock); + SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); -size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); -void* blockDataDestroy(SSDataBlock* pBlock); void blockDebugShowData(const SArray* dataBlocks); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e7895bd972..278d9d8b7c 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -41,6 +41,7 @@ typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef struct SScalarFuncExecFuncs { + FExecGetEnv getEnv; FScalarExecProcess process; } SScalarFuncExecFuncs; @@ -241,7 +242,6 @@ typedef struct tExprNode { }; } tExprNode; -void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree); void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)); typedef struct SAggFunctionInfo { @@ -267,28 +267,6 @@ struct SScalarParam { int32_t numOfRows; }; -typedef struct SMultiFunctionsDesc { - bool stableQuery; - bool groupbyColumn; - bool agg; - bool arithmeticOnAgg; - bool projectionQuery; - bool hasFilter; - bool onlyTagQuery; - bool orderProjectQuery; - bool globalMerge; - bool multigroupResult; - bool blockDistribution; - bool stateWindow; - bool timewindow; - bool sessionWindow; - bool topbotQuery; - bool interpQuery; - bool distinct; - bool join; - bool continueQuery; -} SMultiFunctionsDesc; - int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, bool isSuperTable); @@ -296,8 +274,6 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct tExprNode* exprTreeFromBinary(const void* data, size_t size); -void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc); - tExprNode* exprdup(tExprNode* pTree); void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index c6d17ef65c..7bc0ee42e9 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -58,6 +58,14 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); + +int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7a958136e7..7f25fd1e80 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -331,7 +331,6 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { return 0; } - ASSERT(pColInfoData->nullbitmap == NULL); pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0); pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); return 0; @@ -609,22 +608,6 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); } -SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) { - SSchema* pSchema = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(SSchema)); - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - pSchema[i].bytes = pColInfoData->info.bytes; - pSchema[i].type = pColInfoData->info.type; - pSchema[i].colId = pColInfoData->info.colId; - } - - if (numOfCols != NULL) { - *numOfCols = pBlock->info.numOfCols; - } - - return pSchema; -} - double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); double rowSize = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 99bab32157..bc55f46986 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -483,6 +483,7 @@ typedef struct STableIntervalOperatorInfo { int32_t order; // current SSDataBlock scan order OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3475df0e87..55def79958 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1014,8 +1014,35 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* p return num; } -static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, +// query_range_start, query_range_end, window_duration, window_start, window_end +static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) { + pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; + pColData->info.bytes = sizeof(int64_t); + + blockDataEnsureColumnCapacity(pColData, 5); + colDataAppendInt64(pColData, 0, &pQueryWindow->skey); + colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); + + int64_t interval = 0; + colDataAppendInt64(pColData, 2, &interval); // this value may be variable in case of 'n' and 'y'. + colDataAppendInt64(pColData, 3, &pQueryWindow->skey); + colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); +} + +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) { + int64_t* ts = (int64_t*)pColData->pData; + + int64_t duration = pWin->ekey - pWin->skey + 1; + ts[2] = duration; // set the duration + ts[3] = pWin->skey; // window start key + ts[4] = pWin->ekey + 1; // window end key +} + +static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { + SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function + updateTimeWindowInfo(pTimeWindowData, pWin); + for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pWin->skey; @@ -1038,6 +1065,21 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of pCtx[k].isAggSet = false; } + if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + + SScalarParam out = {.columnData = NULL}; + out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData)); + out.columnData->info.type = TSDB_DATA_TYPE_BIGINT; + out.columnData->info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + out.columnData->pData = p; + pCtx[k].sfp.process(&intervalParam, 1, &out); + pEntryInfo->numOfRes = 1; + pEntryInfo->hasResult = ','; + continue; + } + if (functionNeedToExecute(&pCtx[k])) { pCtx[k].fpSet.process(&pCtx[k]); } @@ -1489,8 +1531,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc } } -static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, - int32_t tableGroupId) { +static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -1563,7 +1604,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); - doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } // restore current time window @@ -1578,8 +1619,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); - doApplyFunctions(pInfo->binfo.pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { @@ -1609,8 +1649,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } if (pInfo->timeWindowInterpo) { @@ -1855,7 +1894,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); @@ -1873,7 +1912,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); } } @@ -1924,8 +1963,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } // pInfo->numOfRows data belong to the current session window - doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1945,8 +1983,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -2013,11 +2050,7 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } - if (functionId == FUNCTION_TS) { - return true; - } - - if (isRowEntryCompleted(pResInfo) || functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { + if (isRowEntryCompleted(pResInfo)) { return false; } @@ -2132,6 +2165,9 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); } else { fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); + if (pCtx->sfp.getEnv != NULL) { + pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env); + } } pCtx->resDataInfo.interBufSize = env.calcMemSize; } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) { @@ -3730,7 +3766,6 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { - // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); @@ -3738,6 +3773,11 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { continue; } + + if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { + continue; + } + // int32_t functionId = pCtx[i].functionId; // if (functionId < 0) { // continue; @@ -4082,8 +4122,7 @@ static void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDa return; } - int32_t orderType = - TSDB_ORDER_ASC; //(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; + int32_t orderType = TSDB_ORDER_ASC; doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset); // add condition (pBlock->info.rows >= 1) just to runtime happy @@ -7845,14 +7884,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->order = TSDB_ORDER_ASC; pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - pInfo->execModel = OPTR_EXEC_MODEL_BATCH; - - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; + pInfo->win.skey = 0; + pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); + // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { goto _error; @@ -7860,7 +7899,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - pOperator->name = "TimeIntervalAggOperator"; + pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0a5a90d2d9..024da4e04b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -315,31 +315,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_qstartts", .type = FUNCTION_TYPE_QSTARTTS, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = qStartTsFunction, .finalizeFunc = NULL }, { .name = "_qendts", .type = FUNCTION_TYPE_QENDTS, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = qEndTsFunction, .finalizeFunc = NULL }, { .name = "_wstartts", - .type = FUNCTION_TYPE_QSTARTTS, + .type = FUNCTION_TYPE_WSTARTTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = winStartTsFunction, .finalizeFunc = NULL }, { @@ -347,9 +347,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_QENDTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = winEndTsFunction, .finalizeFunc = NULL }, { @@ -357,9 +357,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_WDURATION, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = winDurFunction, .finalizeFunc = NULL } }; @@ -368,6 +368,7 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { switch(pFunc->funcType) { + case FUNCTION_TYPE_WDURATION: case FUNCTION_TYPE_COUNT: pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; break; @@ -400,14 +401,18 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { } case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_ROWTS: - case FUNCTION_TYPE_TBNAME: - case FUNCTION_TYPE_QSTARTTS: - case FUNCTION_TYPE_QENDTS: - case FUNCTION_TYPE_WSTARTTS: - case FUNCTION_TYPE_WENDTS: - case FUNCTION_TYPE_WDURATION: + case FUNCTION_TYPE_TBNAME: { // todo break; + } + + case FUNCTION_TYPE_QENDTS: + case FUNCTION_TYPE_QSTARTTS: + case FUNCTION_TYPE_WENDTS: + case FUNCTION_TYPE_WSTARTTS: { + pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; + break; + } case FUNCTION_TYPE_ABS: case FUNCTION_TYPE_CEIL: diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 3858258374..4034a0eb0f 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -92,6 +92,7 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) { return TSDB_CODE_FAILED; } pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc; + pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c index 814aa48b55..61ff6bb825 100644 --- a/source/libs/function/src/texpr.c +++ b/source/libs/function/src/texpr.c @@ -116,42 +116,6 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp return param->nodeFilterFn(pItem, pExpr->_node.info); } - - -static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) { - tbufWriteUint8(bw, expr->nodeType); - - if (expr->nodeType == TEXPR_VALUE_NODE) { - SVariant* pVal = expr->pVal; - - tbufWriteUint32(bw, pVal->nType); - if (pVal->nType == TSDB_DATA_TYPE_BINARY) { - tbufWriteInt32(bw, pVal->nLen); - tbufWrite(bw, pVal->pz, pVal->nLen); - } else { - tbufWriteInt64(bw, pVal->i); - } - - } else if (expr->nodeType == TEXPR_COL_NODE) { - SSchema* pSchema = expr->pSchema; - tbufWriteInt16(bw, pSchema->colId); - tbufWriteInt16(bw, pSchema->bytes); - tbufWriteUint8(bw, pSchema->type); - tbufWriteString(bw, pSchema->name); - - } else if (expr->nodeType == TEXPR_BINARYEXPR_NODE) { - tbufWriteUint8(bw, expr->_node.optr); - exprTreeToBinaryImpl(bw, expr->_node.pLeft); - exprTreeToBinaryImpl(bw, expr->_node.pRight); - } -} - -void exprTreeToBinary(SBufferWriter* bw, tExprNode* expr) { - if (expr != NULL) { - exprTreeToBinaryImpl(bw, expr); - } -} - // TODO: these three functions should be made global static void* exception_calloc(size_t nmemb, size_t size) { void* p = taosMemoryCalloc(nmemb, size); @@ -230,97 +194,6 @@ tExprNode* exprTreeFromBinary(const void* data, size_t size) { return exprTreeFromBinaryImpl(&br); } -tExprNode* exprTreeFromTableName(const char* tbnameCond) { - if (!tbnameCond) { - return NULL; - } - - int32_t anchor = CLEANUP_GET_ANCHOR(); - - tExprNode* expr = exception_calloc(1, sizeof(tExprNode)); - CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL); - - expr->nodeType = TEXPR_BINARYEXPR_NODE; - - tExprNode* left = exception_calloc(1, sizeof(tExprNode)); - expr->_node.pLeft = left; - - left->nodeType = TEXPR_COL_NODE; - SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); - left->pSchema = pSchema; - -// *pSchema = NULL;//*tGetTbnameColumnSchema(); - - tExprNode* right = exception_calloc(1, sizeof(tExprNode)); - expr->_node.pRight = right; - - if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_LIKE; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1; - pVal->pz = exception_malloc(len); - memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len); - pVal->nType = TSDB_DATA_TYPE_BINARY; - pVal->nLen = (int32_t)len; - - } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_MATCH, QUERY_COND_REL_PREFIX_MATCH_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_MATCH; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN) + 1; - pVal->pz = exception_malloc(len); - memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN, len); - pVal->nType = TSDB_DATA_TYPE_BINARY; - pVal->nLen = (int32_t)len; - } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_NMATCH, QUERY_COND_REL_PREFIX_NMATCH_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_NMATCH; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN) + 1; - pVal->pz = exception_malloc(len); - memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN, len); - pVal->nType = TSDB_DATA_TYPE_BINARY; - pVal->nLen = (int32_t)len; - } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_IN; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - pVal->nType = TSDB_DATA_TYPE_POINTER_ARRAY; - pVal->arr = taosArrayInit(2, POINTER_BYTES); - - const char* cond = tbnameCond + QUERY_COND_REL_PREFIX_IN_LEN; - for (const char *e = cond; *e != 0; e++) { - if (*e == TS_PATH_DELIMITER[0]) { - cond = e + 1; - } else if (*e == ',') { - size_t len = e - cond; - char* p = exception_malloc(len + VARSTR_HEADER_SIZE); - STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)len); - cond += len; - taosArrayPush(pVal->arr, &p); - } - } - - if (*cond != 0) { - size_t len = strlen(cond) + VARSTR_HEADER_SIZE; - - char* p = exception_malloc(len); - STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)(len - VARSTR_HEADER_SIZE)); - taosArrayPush(pVal->arr, &p); - } - - taosArraySortString(pVal->arr, taosArrayCompareString); - } - - CLEANUP_EXECUTE_TO(anchor, false); - return expr; -} - void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) { SBufferReader br = tbufInitReader(buf, len, false); uint32_t type = tbufReadUint32(&br); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 7b2cb9ca67..1cc259d4da 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -288,7 +288,7 @@ _return: SCL_RET(code); } -int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { +int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { if (NULL == node->pParameterList || node->pParameterList->length <= 0) { sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -420,7 +420,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)*pNode; SScalarParam output = {0}; - ctx->code = sclExecFuncion(node, ctx, &output); + ctx->code = sclExecFunction(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; } @@ -547,7 +547,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)pNode; SScalarParam output = {0}; - ctx->code = sclExecFuncion(node, ctx, &output); + ctx->code = sclExecFunction(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; } @@ -667,7 +667,7 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { int32_t code = 0; SScalarCtx ctx = {0}; - ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -689,7 +689,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { int32_t code = 0; SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList}; - + // TODO: OPT performance ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); @@ -716,6 +716,3 @@ _return: sclFreeRes(ctx.pRes); return code; } - - - diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index db0aa21f42..d8e97e7e12 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -377,3 +377,34 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf } } +bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(int64_t); + return true; +} + +int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0)); +} + +int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1)); +} + +int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2)); +} + +int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3)); + return TSDB_CODE_SUCCESS; +} + +int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4)); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file