diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 09cf147335..186bf4ea66 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7514,8 +7514,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = 1; - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -7523,18 +7522,18 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); - pOperator->name = "TableAggregate"; + pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; - pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = doOpenAggregateOptr; - pOperator->getNextFn = getAggregateResult; - pOperator->closeFn = destroyAggOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->_openFn = doOpenAggregateOptr; + pOperator->getNextFn = getAggregateResult; + pOperator->closeFn = destroyAggOperatorInfo; pOperator->encodeResultRow = aggEncodeResultRow; pOperator->decodeResultRow = aggDecodeResultRow; @@ -7711,16 +7710,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p // initResultRowInfo(&pBInfo->resultRowInfo, 8); // setFunctionResultOutput(pBInfo, MAIN_SCAN); - pOperator->name = "ProjectOperator"; + pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = num; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doProjectOperation; - pOperator->closeFn = destroyProjectOperatorInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = num; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doProjectOperation; + pOperator->closeFn = destroyProjectOperatorInfo; pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7735,39 +7734,6 @@ _error: return NULL; } -SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { -#if 0 - SColumnInfo* pCols = taosMemoryCalloc(numOfOutput, sizeof(SColumnInfo)); - - int32_t numOfFilter = 0; - for(int32_t i = 0; i < numOfOutput; ++i) { - if (pExpr[i].base.flist.numOfFilters > 0) { - numOfFilter += 1; - } - - pCols[i].type = pExpr[i].base.resSchema.type; - pCols[i].bytes = pExpr[i].base.resSchema.bytes; - pCols[i].colId = pExpr[i].base.resSchema.colId; - - pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; - if (pCols[i].flist.numOfFilters != 0) { - pCols[i].flist.filterInfo = taosMemoryCalloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); - memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); - } else { - // avoid runtime error - pCols[i].flist.filterInfo = NULL; - } - } - - assert(numOfFilter > 0); - - *numOfFilterCols = numOfFilter; - return pCols; -#endif - - return 0; -} - SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SLimitOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SLimitOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -7778,17 +7744,18 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit pInfo->limit = *pLimit; pInfo->currentOffset = pLimit->offset; - pOperator->name = "LimitOperator"; + pOperator->name = "LimitOperator"; // pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doLimit; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + pOperator->status = OP_NOT_OPENED; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doLimit; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -7805,17 +7772,15 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->order = TSDB_ORDER_ASC; + pInfo->order = TSDB_ORDER_ASC; pInfo->precision = TSDB_TIME_PRECISION_MILLI; - pInfo->win = pTaskInfo->window; - pInfo->interval = *pInterval; - - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; + pInfo->win = pTaskInfo->window; + pInfo->interval = *pInterval; + pInfo->win.skey = INT64_MIN; + pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { goto _error; @@ -7826,14 +7791,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; - pOperator->_openFn = doOpenIntervalAgg; - pOperator->getNextFn = doBuildIntervalResult; - pOperator->closeFn = destroyIntervalOperatorInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->_openFn = doOpenIntervalAgg; + pOperator->getNextFn = doBuildIntervalResult; + pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -8555,6 +8520,23 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i return s; } +static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) { + SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn)); + if (pCol == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCol->slotId = slotId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; + pCol->precision = pType->precision; + pCol->dataBlockId = blockId; + + return pCol; +} + SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) { int32_t numOfFuncs = LIST_LENGTH(pNodeList); int32_t numOfGroupKeys = 0; @@ -8586,18 +8568,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; - pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); - - SColumn* pCol = pExp->base.pParam[0].pCol; - pCol->slotId = pColNode->slotId; // TODO refactor - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; - pCol->precision = pType->precision; + pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) { pExp->pExpr->nodeType = QUERY_NODE_FUNCTION; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; @@ -8608,8 +8583,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; - strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, - tListLen(pExp->pExpr->_function.functionName)); + strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); // TODO: value parameter needs to be handled int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); @@ -8620,21 +8594,12 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* for (int32_t j = 0; j < numOfParam; ++j) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); if (p1->type == QUERY_NODE_COLUMN) { - SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor + SColumnNode* pcn = (SColumnNode*) p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; - pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - SColumn* pCol = pExp->base.pParam[j].pCol; - - pCol->slotId = pcn->slotId; - pCol->bytes = pcn->node.resType.bytes; - pCol->type = pcn->node.resType.type; - pCol->scale = pcn->node.resType.scale; - pCol->precision = pcn->node.resType.precision; - pCol->dataBlockId = pcn->dataBlockId; + pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; - pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; } } @@ -8644,21 +8609,14 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); pExp->base.numOfParams = 1; - pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; SDataType* pType = &pNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pNode->node.aliasName); + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; - SColumn* pCol = pExp->base.pParam[0].pCol; - pCol->slotId = pTargetNode->slotId; // TODO refactor - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; - pCol->precision = pType->precision; + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; + pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType); } else { ASSERT(0); } @@ -8692,17 +8650,15 @@ static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); static SArray* createSortInfo(SNodeList* pNodeList); -SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, +SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; - tsdbReaderT pDataReader = - doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); - SArray* pColList = - extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); + tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pTaskInfo); @@ -8745,7 +8701,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa assert(size == 1); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num); @@ -8757,7 +8713,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); int32_t num = 0; @@ -8778,7 +8734,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -8787,11 +8743,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SInterval interval = {.interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset}; + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = TSDB_TIME_PRECISION_MILLI,}; return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { @@ -8799,7 +8756,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa assert(size == 1); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; @@ -8811,7 +8768,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa assert(size == 1); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -8827,7 +8784,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } }*/ @@ -8972,11 +8929,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod for (int32_t i = 0; i < num; ++i) { SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i); SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); - // if (pNode->output) { - (*numOfOutputCols) += 1; - // } else { - // info->output = false; - // } + if (pNode->output) { + (*numOfOutputCols) += 1; + } else { + info->output = false; + } } return pList; @@ -9045,7 +9002,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); + (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; diff --git a/source/libs/function/src/tunaryoperator.c b/source/libs/function/src/tunaryoperator.c deleted file mode 100644 index 957f0799c5..0000000000 --- a/source/libs/function/src/tunaryoperator.c +++ /dev/null @@ -1,13 +0,0 @@ -#include "tunaryoperator.h" - - - - -// TODO dynamic define these functions -//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) { -// assert(0); -//} - -//bool isStringOperatorFn(int32_t op) { -// return op == FUNCTION_LENGTH; -//}