From dd66affd0ae034e894a90c7dbf9342ae2f399fba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Mar 2022 20:05:04 +0800 Subject: [PATCH 1/3] [td-13039] merge 3.0. --- source/libs/executor/src/executorimpl.c | 213 +++++++++------------- source/libs/function/src/tunaryoperator.c | 13 -- 2 files changed, 85 insertions(+), 141 deletions(-) delete mode 100644 source/libs/function/src/tunaryoperator.c diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 09cf147335..186bf4ea66 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7514,8 +7514,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = 1; - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -7523,18 +7522,18 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); - pOperator->name = "TableAggregate"; + pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; - pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = doOpenAggregateOptr; - pOperator->getNextFn = getAggregateResult; - pOperator->closeFn = destroyAggOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->_openFn = doOpenAggregateOptr; + pOperator->getNextFn = getAggregateResult; + pOperator->closeFn = destroyAggOperatorInfo; pOperator->encodeResultRow = aggEncodeResultRow; pOperator->decodeResultRow = aggDecodeResultRow; @@ -7711,16 +7710,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p // initResultRowInfo(&pBInfo->resultRowInfo, 8); // setFunctionResultOutput(pBInfo, MAIN_SCAN); - pOperator->name = "ProjectOperator"; + pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = num; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doProjectOperation; - pOperator->closeFn = destroyProjectOperatorInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = num; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doProjectOperation; + pOperator->closeFn = destroyProjectOperatorInfo; pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7735,39 +7734,6 @@ _error: return NULL; } -SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { -#if 0 - SColumnInfo* pCols = taosMemoryCalloc(numOfOutput, sizeof(SColumnInfo)); - - int32_t numOfFilter = 0; - for(int32_t i = 0; i < numOfOutput; ++i) { - if (pExpr[i].base.flist.numOfFilters > 0) { - numOfFilter += 1; - } - - pCols[i].type = pExpr[i].base.resSchema.type; - pCols[i].bytes = pExpr[i].base.resSchema.bytes; - pCols[i].colId = pExpr[i].base.resSchema.colId; - - pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; - if (pCols[i].flist.numOfFilters != 0) { - pCols[i].flist.filterInfo = taosMemoryCalloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); - memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); - } else { - // avoid runtime error - pCols[i].flist.filterInfo = NULL; - } - } - - assert(numOfFilter > 0); - - *numOfFilterCols = numOfFilter; - return pCols; -#endif - - return 0; -} - SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SLimitOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SLimitOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -7778,17 +7744,18 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit pInfo->limit = *pLimit; pInfo->currentOffset = pLimit->offset; - pOperator->name = "LimitOperator"; + pOperator->name = "LimitOperator"; // pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doLimit; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + pOperator->status = OP_NOT_OPENED; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doLimit; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -7805,17 +7772,15 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->order = TSDB_ORDER_ASC; + pInfo->order = TSDB_ORDER_ASC; pInfo->precision = TSDB_TIME_PRECISION_MILLI; - pInfo->win = pTaskInfo->window; - pInfo->interval = *pInterval; - - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; + pInfo->win = pTaskInfo->window; + pInfo->interval = *pInterval; + pInfo->win.skey = INT64_MIN; + pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { goto _error; @@ -7826,14 +7791,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; - pOperator->_openFn = doOpenIntervalAgg; - pOperator->getNextFn = doBuildIntervalResult; - pOperator->closeFn = destroyIntervalOperatorInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->_openFn = doOpenIntervalAgg; + pOperator->getNextFn = doBuildIntervalResult; + pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -8555,6 +8520,23 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } +static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) { + SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn)); + if (pCol == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCol->slotId = slotId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; + pCol->precision = pType->precision; + pCol->dataBlockId = blockId; + + return pCol; +} + SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; @@ -8586,18 +8568,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; - pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); - - SColumn* pCol = pExp->base.pParam[0].pCol; - pCol->slotId = pColNode->slotId; // TODO refactor - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; - pCol->precision = pType->precision; + pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; @@ -8608,8 +8583,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; - strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, - tListLen(pExp->pExpr->_function.functionName)); + strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); // TODO: value parameter needs to be handled int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); @@ -8620,21 +8594,12 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* for (int32_t j = 0; j < numOfParam; ++j) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); if (p1->type == QUERY_NODE_COLUMN) { - SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor + SColumnNode* pcn = (SColumnNode*) p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; - pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - SColumn* pCol = pExp->base.pParam[j].pCol; - - pCol->slotId = pcn->slotId; - pCol->bytes = pcn->node.resType.bytes; - pCol->type = pcn->node.resType.type; - pCol->scale = pcn->node.resType.scale; - pCol->precision = pcn->node.resType.precision; - pCol->dataBlockId = pcn->dataBlockId; + pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; - pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; } } @@ -8644,21 +8609,14 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; - pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; SDataType* pType = &pNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pNode->node.aliasName); + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; - SColumn* pCol = pExp->base.pParam[0].pCol; - pCol->slotId = pTargetNode->slotId; // TODO refactor - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; - pCol->precision = pType->precision; + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; + pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType); } else { ASSERT(0); } @@ -8692,17 +8650,15 @@ static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); static SArray* createSortInfo(SNodeList* pNodeList); -SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, +SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; - tsdbReaderT pDataReader = - doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - SArray* pColList = - extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); + tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo); @@ -8745,7 +8701,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa assert(size == 1); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num); @@ -8757,7 +8713,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); int32_t num = 0; @@ -8778,7 +8734,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -8787,11 +8743,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SInterval interval = {.interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset}; + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = TSDB_TIME_PRECISION_MILLI,}; return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { @@ -8799,7 +8756,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa assert(size == 1); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -8811,7 +8768,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa assert(size == 1); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -8827,7 +8784,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ @@ -8972,11 +8929,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod for (int32_t i = 0; i < num; ++i) { SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i); SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); - // if (pNode->output) { - (*numOfOutputCols) += 1; - // } else { - // info->output = false; - // } + if (pNode->output) { + (*numOfOutputCols) += 1; + } else { + info->output = false; + } } return pList; @@ -9045,7 +9002,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); + (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; diff --git a/source/libs/function/src/tunaryoperator.c b/source/libs/function/src/tunaryoperator.c deleted file mode 100644 index 957f0799c5..0000000000 --- a/source/libs/function/src/tunaryoperator.c +++ /dev/null @@ -1,13 +0,0 @@ -#include "tunaryoperator.h" - - - - -// TODO dynamic define these functions -//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) { -// assert(0); -//} - -//bool isStringOperatorFn(int32_t op) { -// return op == FUNCTION_LENGTH; -//} From dba4a72dac8e5ca1374390058043de6f88dff5c4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Mar 2022 20:07:38 +0800 Subject: [PATCH 2/3] [td-13039] support stream execution interval query. --- include/common/tmsg.h | 3 +- source/libs/executor/inc/executorimpl.h | 79 ++++------ source/libs/executor/src/executorimpl.c | 190 ++++++++++++++++-------- 3 files changed, 163 insertions(+), 109 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index de21638179..ca46657473 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -469,8 +469,7 @@ typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; - char - offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2bbd16fbd0..99bab32157 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -161,20 +161,8 @@ typedef struct STaskCostInfo { typedef struct SOperatorCostInfo { uint64_t openCost; uint64_t execCost; -// uint64_t totalRows; -// uint64_t totalBytes; } SOperatorCostInfo; -typedef struct { - int64_t vgroupLimit; - int64_t ts; -} SOrderedPrjQueryInfo; - -typedef struct { - char* tags; - SArray* pResult; // SArray -} SInterResult; - // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { @@ -230,7 +218,6 @@ typedef struct STaskAttr { SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* fillVal; - SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; // SFilterInfo *pFilters; @@ -245,6 +232,7 @@ struct SOperatorInfo; typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); + typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup); typedef void (*__optr_close_fn_t)(void* param, int32_t num); @@ -330,11 +318,12 @@ typedef struct SOperatorInfo { SResultInfo resultInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_fn_t getNextFn; - __optr_fn_t cleanupFn; - __optr_close_fn_t closeFn; __optr_open_fn_t _openFn; // DO NOT invoke this function directly - __optr_encode_fn_t encodeResultRow; // + __optr_fn_t getNextFn; + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_close_fn_t closeFn; + __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; } SOperatorInfo; @@ -363,18 +352,18 @@ typedef struct SQInfo { STaskCostInfo summary; } SQInfo; -enum { - DATA_NOT_READY = 0x1, - DATA_READY = 0x2, - DATA_EXHAUSTED = 0x3, -}; +typedef enum { + EX_SOURCE_DATA_NOT_READY = 0x1, + EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_EXHAUSTED = 0x3, +} EX_SOURCE_STATUS; typedef struct SSourceDataInfo { struct SExchangeInfo *pEx; - int32_t index; - SRetrieveTableRsp *pRsp; - uint64_t totalRows; - int32_t status; + int32_t index; + SRetrieveTableRsp *pRsp; + uint64_t totalRows; + EX_SOURCE_STATUS status; } SSourceDataInfo; typedef struct SLoadRemoteDataInfo { @@ -383,12 +372,6 @@ typedef struct SLoadRemoteDataInfo { uint64_t totalElapsed; // total elapsed time } SLoadRemoteDataInfo; -enum { - EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, - EX_SOURCE_DATA_EXHAUSTED = 0x3, -}; - typedef struct SExchangeInfo { SArray* pSources; SArray* pSourceDataInfo; @@ -483,17 +466,23 @@ typedef struct SAggSupporter { int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; +typedef enum { + OPTR_EXEC_MODEL_BATCH = 0x1, + OPTR_EXEC_MODEL_STREAM = 0x2, +} OPTR_EXEC_MODEL; + typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; - SGroupResInfo groupResInfo; - SInterval interval; - STimeWindow win; - int32_t precision; - bool timeWindowInterpo; - char **pRow; - SAggSupporter aggSup; - STableQueryInfo *pCurrent; - int32_t order; + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char **pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo *pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -695,12 +684,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); -bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); -void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); - -SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); - void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 186bf4ea66..3475df0e87 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1054,7 +1054,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, int32_t order = pInfo->order; bool ascQuery = (order == TSDB_ORDER_ASC); - int32_t precision = pInfo->precision; + int32_t precision = pInterval->precision; getNextTimeWindow(pInterval, precision, order, pNext); // next time window is not in current block @@ -1489,15 +1489,20 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc } } -static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, +static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; int32_t numOfOutput = pOperatorInfo->numOfOutput; + SArray* pUpdated = NULL; + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + pUpdated = taosArrayInit(4, sizeof(SResultRowPosition)); + } + int32_t step = 1; - bool ascQuery = true; + bool ascScan = true; int32_t prevIndex = pResultRowInfo->curPos; @@ -1509,10 +1514,10 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } - int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery); + int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan); - STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->precision, &pInfo->win); + STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win); bool masterScan = true; SResultRow* pResult = NULL; @@ -1523,6 +1528,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + taosArrayPush(pUpdated, &pos); + } + int32_t forwardStep = 0; TSKEY ekey = win.ekey; forwardStep = @@ -1534,8 +1544,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. SResultRow* pRes = getResultRow(pResultRowInfo, j); if (pRes->closed) { - assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && - resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); + assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); continue; } @@ -1548,7 +1557,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); @@ -1589,6 +1597,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + taosArrayPush(pUpdated, &pos); + } + ekey = nextWin.ekey; // reviseWindowEkey(pQueryAttr, &nextWin); forwardStep = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); @@ -1601,10 +1614,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pInfo->timeWindowInterpo) { - int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0; + int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0; saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols); } + return pUpdated; // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); } @@ -3590,6 +3604,42 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD } } +void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, + int32_t* rowCellInfoOffset) { + size_t num = taosArrayGetSize(pUpdateList); + + for (int32_t i = 0; i < num; ++i) { + SResultRowPosition* pPos = taosArrayGet(pUpdateList, i); + + SFilePage* bufPage = getBufPage(pBuf, pPos->pageId); + SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset); + + for (int32_t j = 0; j < numOfOutput; ++j) { + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); + + struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; + if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { + continue; + } + + if (pCtx[j].fpSet.process) { // TODO set the dummy function. + pCtx[j].fpSet.finalize(&pCtx[j]); + } + + if (pRow->numOfRows < pResInfo->numOfRes) { + pRow->numOfRows = pResInfo->numOfRes; + } + } + + releaseBufPage(pBuf, bufPage); + /* + * set the number of output results for group by normal columns, the number of output rows usually is 1 except + * the top and bottom query + */ + // buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); + } +} + static bool hasMainOutput(STaskAttr* pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); @@ -5074,12 +5124,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == DATA_EXHAUSTED) { + if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; continue; } - if (pDataInfo->status != DATA_READY) { + if (pDataInfo->status != EX_SOURCE_DATA_READY) { continue; } @@ -5093,7 +5143,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; continue; } @@ -5111,16 +5161,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } - if (pDataInfo->status != DATA_EXHAUSTED) { - pDataInfo->status = DATA_NOT_READY; + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5223,7 +5272,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; continue; } @@ -5240,7 +5289,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 @@ -6798,37 +6847,6 @@ static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) { return pBlock; } -static SSDataBlock* doFilter(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SFilterOperatorInfo* pCondInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - - while (1) { - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - break; - } - - doSetFilterColumnInfo(pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock); - assert(pRuntimeEnv->pTsBuf == NULL); - filterRowsInDataBlock(pRuntimeEnv, pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock, true); - - if (pBlock->info.rows > 0) { - return pBlock; - } - } - - doSetOperatorCompleted(pOperator); - return NULL; -} - static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -6837,8 +6855,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { STableIntervalOperatorInfo* pInfo = pOperator->info; int32_t order = TSDB_ORDER_ASC; - // STimeWindow win = pQueryAttr->window; - bool newgroup = false; + // STimeWindow win = {0}; + bool newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -6851,7 +6869,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); @@ -6890,7 +6907,60 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } -static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) { + STableIntervalOperatorInfo* pInfo = pOperator->info; + int32_t order = TSDB_ORDER_ASC; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (pOperator->status == OP_RES_TO_RETURN) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + return pInfo->binfo.pRes; + } + + // STimeWindow win = {0}; + bool newgroup = false; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + SArray* pUpdated = NULL; + + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pBlock == NULL) { + break; + } + + // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the caller. + // Note that all the time window are not close till now. + + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + } + + finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + + ASSERT(pInfo->binfo.pRes->info.rows > 0); + pOperator->status = OP_RES_TO_RETURN; + + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; +} + +static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -7773,11 +7843,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->order = TSDB_ORDER_ASC; - pInfo->precision = TSDB_TIME_PRECISION_MILLI; pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; + + pInfo->execModel = OPTR_EXEC_MODEL_BATCH; + + pInfo->win.skey = INT64_MIN; + pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); @@ -8748,7 +8820,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .intervalUnit = pIntervalPhyNode->intervalUnit, .slidingUnit = pIntervalPhyNode->slidingUnit, .offset = pIntervalPhyNode->offset, - .precision = TSDB_TIME_PRECISION_MILLI,}; + .precision = TSDB_TIME_PRECISION_MILLI}; return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { From 39036ea512f1b4c88a98ee74fc4ee52d8306aa76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Mar 2022 13:41:15 +0800 Subject: [PATCH 3/3] [td-13039] support pseudo column in interval query. --- include/common/tdatablock.h | 16 +-- include/libs/function/function.h | 26 +---- include/libs/scalar/scalar.h | 8 ++ source/common/src/tdatablock.c | 17 ---- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 93 ++++++++++++----- source/libs/function/src/builtins.c | 43 ++++---- source/libs/function/src/functionMgt.c | 1 + source/libs/function/src/texpr.c | 127 ------------------------ source/libs/scalar/src/scalar.c | 13 +-- source/libs/scalar/src/sclfunc.c | 31 ++++++ 11 files changed, 145 insertions(+), 231 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 4a47acfa50..4bbe42bc50 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -133,7 +133,8 @@ static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData, } static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_BIGINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UBIGINT); + int32_t type = pColumnInfoData->info.type; + ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; *(int64_t*)p = *(int64_t*)v; } @@ -175,17 +176,16 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); -SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); - int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); -int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void blockDataCleanup(SSDataBlock* pDataBlock); +int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); +void blockDataCleanup(SSDataBlock* pDataBlock); +size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); +void* blockDataDestroy(SSDataBlock* pBlock); + SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); -size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); -void* blockDataDestroy(SSDataBlock* pBlock); void blockDebugShowData(const SArray* dataBlocks); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e7895bd972..278d9d8b7c 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -41,6 +41,7 @@ typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef struct SScalarFuncExecFuncs { + FExecGetEnv getEnv; FScalarExecProcess process; } SScalarFuncExecFuncs; @@ -241,7 +242,6 @@ typedef struct tExprNode { }; } tExprNode; -void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree); void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)); typedef struct SAggFunctionInfo { @@ -267,28 +267,6 @@ struct SScalarParam { int32_t numOfRows; }; -typedef struct SMultiFunctionsDesc { - bool stableQuery; - bool groupbyColumn; - bool agg; - bool arithmeticOnAgg; - bool projectionQuery; - bool hasFilter; - bool onlyTagQuery; - bool orderProjectQuery; - bool globalMerge; - bool multigroupResult; - bool blockDistribution; - bool stateWindow; - bool timewindow; - bool sessionWindow; - bool topbotQuery; - bool interpQuery; - bool distinct; - bool join; - bool continueQuery; -} SMultiFunctionsDesc; - int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, bool isSuperTable); @@ -296,8 +274,6 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct tExprNode* exprTreeFromBinary(const void* data, size_t size); -void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc); - tExprNode* exprdup(tExprNode* pTree); void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index c6d17ef65c..7bc0ee42e9 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -58,6 +58,14 @@ int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); + +int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7a958136e7..7f25fd1e80 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -331,7 +331,6 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { return 0; } - ASSERT(pColInfoData->nullbitmap == NULL); pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0); pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); return 0; @@ -609,22 +608,6 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); } -SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) { - SSchema* pSchema = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(SSchema)); - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - pSchema[i].bytes = pColInfoData->info.bytes; - pSchema[i].type = pColInfoData->info.type; - pSchema[i].colId = pColInfoData->info.colId; - } - - if (numOfCols != NULL) { - *numOfCols = pBlock->info.numOfCols; - } - - return pSchema; -} - double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); double rowSize = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 99bab32157..bc55f46986 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -483,6 +483,7 @@ typedef struct STableIntervalOperatorInfo { int32_t order; // current SSDataBlock scan order OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3475df0e87..55def79958 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1014,8 +1014,35 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* p return num; } -static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, +// query_range_start, query_range_end, window_duration, window_start, window_end +static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) { + pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; + pColData->info.bytes = sizeof(int64_t); + + blockDataEnsureColumnCapacity(pColData, 5); + colDataAppendInt64(pColData, 0, &pQueryWindow->skey); + colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); + + int64_t interval = 0; + colDataAppendInt64(pColData, 2, &interval); // this value may be variable in case of 'n' and 'y'. + colDataAppendInt64(pColData, 3, &pQueryWindow->skey); + colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); +} + +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) { + int64_t* ts = (int64_t*)pColData->pData; + + int64_t duration = pWin->ekey - pWin->skey + 1; + ts[2] = duration; // set the duration + ts[3] = pWin->skey; // window start key + ts[4] = pWin->ekey + 1; // window end key +} + +static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { + SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function + updateTimeWindowInfo(pTimeWindowData, pWin); + for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pWin->skey; @@ -1038,6 +1065,21 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of pCtx[k].isAggSet = false; } + if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + + SScalarParam out = {.columnData = NULL}; + out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData)); + out.columnData->info.type = TSDB_DATA_TYPE_BIGINT; + out.columnData->info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + out.columnData->pData = p; + pCtx[k].sfp.process(&intervalParam, 1, &out); + pEntryInfo->numOfRes = 1; + pEntryInfo->hasResult = ','; + continue; + } + if (functionNeedToExecute(&pCtx[k])) { pCtx[k].fpSet.process(&pCtx[k]); } @@ -1489,8 +1531,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc } } -static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, - int32_t tableGroupId) { +static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -1563,7 +1604,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); - doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } // restore current time window @@ -1578,8 +1619,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); - doApplyFunctions(pInfo->binfo.pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; while (1) { @@ -1609,8 +1649,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } if (pInfo->timeWindowInterpo) { @@ -1855,7 +1894,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); @@ -1873,7 +1912,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); } } @@ -1924,8 +1963,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } // pInfo->numOfRows data belong to the current session window - doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1945,8 +1983,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, - TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -2013,11 +2050,7 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } - if (functionId == FUNCTION_TS) { - return true; - } - - if (isRowEntryCompleted(pResInfo) || functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { + if (isRowEntryCompleted(pResInfo)) { return false; } @@ -2132,6 +2165,9 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); } else { fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); + if (pCtx->sfp.getEnv != NULL) { + pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env); + } } pCtx->resDataInfo.interBufSize = env.calcMemSize; } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) { @@ -3730,7 +3766,6 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { - // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); @@ -3738,6 +3773,11 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { continue; } + + if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { + continue; + } + // int32_t functionId = pCtx[i].functionId; // if (functionId < 0) { // continue; @@ -4082,8 +4122,7 @@ static void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDa return; } - int32_t orderType = - TSDB_ORDER_ASC; //(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; + int32_t orderType = TSDB_ORDER_ASC; doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset); // add condition (pBlock->info.rows >= 1) just to runtime happy @@ -7845,14 +7884,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->order = TSDB_ORDER_ASC; pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - pInfo->execModel = OPTR_EXEC_MODEL_BATCH; - - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; + pInfo->win.skey = 0; + pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); + // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { goto _error; @@ -7860,7 +7899,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - pOperator->name = "TimeIntervalAggOperator"; + pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0a5a90d2d9..024da4e04b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -315,31 +315,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_qstartts", .type = FUNCTION_TYPE_QSTARTTS, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = qStartTsFunction, .finalizeFunc = NULL }, { .name = "_qendts", .type = FUNCTION_TYPE_QENDTS, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = qEndTsFunction, .finalizeFunc = NULL }, { .name = "_wstartts", - .type = FUNCTION_TYPE_QSTARTTS, + .type = FUNCTION_TYPE_WSTARTTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = winStartTsFunction, .finalizeFunc = NULL }, { @@ -347,9 +347,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_QENDTS, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = winEndTsFunction, .finalizeFunc = NULL }, { @@ -357,9 +357,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_WDURATION, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, + .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, - .sprocessFunc = NULL, + .sprocessFunc = winDurFunction, .finalizeFunc = NULL } }; @@ -368,6 +368,7 @@ const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFun int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { switch(pFunc->funcType) { + case FUNCTION_TYPE_WDURATION: case FUNCTION_TYPE_COUNT: pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; break; @@ -400,14 +401,18 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { } case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_ROWTS: - case FUNCTION_TYPE_TBNAME: - case FUNCTION_TYPE_QSTARTTS: - case FUNCTION_TYPE_QENDTS: - case FUNCTION_TYPE_WSTARTTS: - case FUNCTION_TYPE_WENDTS: - case FUNCTION_TYPE_WDURATION: + case FUNCTION_TYPE_TBNAME: { // todo break; + } + + case FUNCTION_TYPE_QENDTS: + case FUNCTION_TYPE_QSTARTTS: + case FUNCTION_TYPE_WENDTS: + case FUNCTION_TYPE_WSTARTTS: { + pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; + break; + } case FUNCTION_TYPE_ABS: case FUNCTION_TYPE_CEIL: diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 3858258374..4034a0eb0f 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -92,6 +92,7 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) { return TSDB_CODE_FAILED; } pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc; + pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c index 814aa48b55..61ff6bb825 100644 --- a/source/libs/function/src/texpr.c +++ b/source/libs/function/src/texpr.c @@ -116,42 +116,6 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp return param->nodeFilterFn(pItem, pExpr->_node.info); } - - -static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) { - tbufWriteUint8(bw, expr->nodeType); - - if (expr->nodeType == TEXPR_VALUE_NODE) { - SVariant* pVal = expr->pVal; - - tbufWriteUint32(bw, pVal->nType); - if (pVal->nType == TSDB_DATA_TYPE_BINARY) { - tbufWriteInt32(bw, pVal->nLen); - tbufWrite(bw, pVal->pz, pVal->nLen); - } else { - tbufWriteInt64(bw, pVal->i); - } - - } else if (expr->nodeType == TEXPR_COL_NODE) { - SSchema* pSchema = expr->pSchema; - tbufWriteInt16(bw, pSchema->colId); - tbufWriteInt16(bw, pSchema->bytes); - tbufWriteUint8(bw, pSchema->type); - tbufWriteString(bw, pSchema->name); - - } else if (expr->nodeType == TEXPR_BINARYEXPR_NODE) { - tbufWriteUint8(bw, expr->_node.optr); - exprTreeToBinaryImpl(bw, expr->_node.pLeft); - exprTreeToBinaryImpl(bw, expr->_node.pRight); - } -} - -void exprTreeToBinary(SBufferWriter* bw, tExprNode* expr) { - if (expr != NULL) { - exprTreeToBinaryImpl(bw, expr); - } -} - // TODO: these three functions should be made global static void* exception_calloc(size_t nmemb, size_t size) { void* p = taosMemoryCalloc(nmemb, size); @@ -230,97 +194,6 @@ tExprNode* exprTreeFromBinary(const void* data, size_t size) { return exprTreeFromBinaryImpl(&br); } -tExprNode* exprTreeFromTableName(const char* tbnameCond) { - if (!tbnameCond) { - return NULL; - } - - int32_t anchor = CLEANUP_GET_ANCHOR(); - - tExprNode* expr = exception_calloc(1, sizeof(tExprNode)); - CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL); - - expr->nodeType = TEXPR_BINARYEXPR_NODE; - - tExprNode* left = exception_calloc(1, sizeof(tExprNode)); - expr->_node.pLeft = left; - - left->nodeType = TEXPR_COL_NODE; - SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); - left->pSchema = pSchema; - -// *pSchema = NULL;//*tGetTbnameColumnSchema(); - - tExprNode* right = exception_calloc(1, sizeof(tExprNode)); - expr->_node.pRight = right; - - if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_LIKE; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1; - pVal->pz = exception_malloc(len); - memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len); - pVal->nType = TSDB_DATA_TYPE_BINARY; - pVal->nLen = (int32_t)len; - - } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_MATCH, QUERY_COND_REL_PREFIX_MATCH_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_MATCH; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN) + 1; - pVal->pz = exception_malloc(len); - memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_MATCH_LEN, len); - pVal->nType = TSDB_DATA_TYPE_BINARY; - pVal->nLen = (int32_t)len; - } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_NMATCH, QUERY_COND_REL_PREFIX_NMATCH_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_NMATCH; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN) + 1; - pVal->pz = exception_malloc(len); - memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_NMATCH_LEN, len); - pVal->nType = TSDB_DATA_TYPE_BINARY; - pVal->nLen = (int32_t)len; - } else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) { - right->nodeType = TEXPR_VALUE_NODE; - expr->_node.optr = OP_TYPE_IN; - SVariant* pVal = exception_calloc(1, sizeof(SVariant)); - right->pVal = pVal; - pVal->nType = TSDB_DATA_TYPE_POINTER_ARRAY; - pVal->arr = taosArrayInit(2, POINTER_BYTES); - - const char* cond = tbnameCond + QUERY_COND_REL_PREFIX_IN_LEN; - for (const char *e = cond; *e != 0; e++) { - if (*e == TS_PATH_DELIMITER[0]) { - cond = e + 1; - } else if (*e == ',') { - size_t len = e - cond; - char* p = exception_malloc(len + VARSTR_HEADER_SIZE); - STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)len); - cond += len; - taosArrayPush(pVal->arr, &p); - } - } - - if (*cond != 0) { - size_t len = strlen(cond) + VARSTR_HEADER_SIZE; - - char* p = exception_malloc(len); - STR_WITH_SIZE_TO_VARSTR(p, cond, (VarDataLenT)(len - VARSTR_HEADER_SIZE)); - taosArrayPush(pVal->arr, &p); - } - - taosArraySortString(pVal->arr, taosArrayCompareString); - } - - CLEANUP_EXECUTE_TO(anchor, false); - return expr; -} - void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) { SBufferReader br = tbufInitReader(buf, len, false); uint32_t type = tbufReadUint32(&br); diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 7b2cb9ca67..1cc259d4da 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -288,7 +288,7 @@ _return: SCL_RET(code); } -int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { +int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { if (NULL == node->pParameterList || node->pParameterList->length <= 0) { sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -420,7 +420,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)*pNode; SScalarParam output = {0}; - ctx->code = sclExecFuncion(node, ctx, &output); + ctx->code = sclExecFunction(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; } @@ -547,7 +547,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)pNode; SScalarParam output = {0}; - ctx->code = sclExecFuncion(node, ctx, &output); + ctx->code = sclExecFunction(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; } @@ -667,7 +667,7 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { int32_t code = 0; SScalarCtx ctx = {0}; - ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -689,7 +689,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { int32_t code = 0; SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList}; - + // TODO: OPT performance ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); @@ -716,6 +716,3 @@ _return: sclFreeRes(ctx.pRes); return code; } - - - diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index db0aa21f42..d8e97e7e12 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -377,3 +377,34 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf } } +bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(int64_t); + return true; +} + +int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0)); +} + +int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1)); +} + +int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2)); +} + +int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3)); + return TSDB_CODE_SUCCESS; +} + +int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + ASSERT(inputNum == 1); + colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4)); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file