From 3a23a30a9c5a12797a451a4a3d6b74f6c02c94de Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Feb 2022 14:39:42 +0800 Subject: [PATCH] [td-11818] Refactor. --- include/common/tep.h | 2 +- include/libs/planner/plannerOp.h | 2 +- source/common/src/tep.c | 2 +- source/libs/executor/inc/executorimpl.h | 9 +- source/libs/executor/src/executorimpl.c | 109 ++++++++++++--------- source/libs/planner/src/physicalPlan.c | 4 +- source/libs/planner/src/physicalPlanJson.c | 6 +- 7 files changed, 77 insertions(+), 57 deletions(-) diff --git a/include/common/tep.h b/include/common/tep.h index 8194835223..d5ad2bca98 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -73,7 +73,7 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); -int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock); +int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); void colDataTrim(SColumnInfoData* pColumnInfoData); diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 31f5457c90..9030ffc946 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -24,7 +24,7 @@ #endif OP_ENUM_MACRO(StreamScan) -OP_ENUM_MACRO(DataBlocksOptScan) +OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TagScan) OP_ENUM_MACRO(SystemTableScan) diff --git a/source/common/src/tep.c b/source/common/src/tep.c index 6ead90caee..c015f0ac21 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -234,7 +234,7 @@ size_t colDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } -int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) { +int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6b1aae6c0c..e013e7c6b1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -97,10 +97,11 @@ typedef struct SSingleColumnFilterInfo { } SSingleColumnFilterInfo; typedef struct STableQueryInfo { - TSKEY lastKey; + TSKEY lastKey; // last check ts + uint64_t uid; // table uid int32_t groupIndex; // group id in table list // SVariant tag; - SResultRowInfo resInfo; + SResultRowInfo resInfo; // result info } STableQueryInfo; typedef enum { @@ -596,8 +597,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 54917eb14c..831fd19710 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4015,7 +4015,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI return 0; } -static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { +static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); pBlock->info.rows = 0; @@ -4024,17 +4024,10 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime } int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; - doCopyToSDataBlock(NULL, pGroupResInfo, orderType, pBlock, 0); + doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity); - // refactor : extract method - SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); - - //add condition (pBlock->info.rows >= 1) just to runtime happy - if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) { - STimeWindow* w = &pBlock->info.window; - w->skey = *(int64_t*)pInfoData->pData; - w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pBlock->info.rows - 1)); - } + // add condition (pBlock->info.rows >= 1) just to runtime happy + blockDataUpdateTsWindow(pBlock); } static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, @@ -5373,7 +5366,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pInfo->scanFlag = MAIN_SCAN; pOperator->name = "DataBlocksOptimizedScanOperator"; - pOperator->operatorType = OP_DataBlocksOptScan; + pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -6188,7 +6181,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_RES_TO_RETURN) { -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); + toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); if (pInfo->pRes->info.rows == 0 /*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { pOperator->status = OP_EXEC_DONE; @@ -6212,7 +6205,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { } // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); -// if (downstream->operatorType == OP_DataBlocksOptScan) { +// if (downstream->operatorType == OP_TableScan) { // STableScanInfo* pScanInfo = downstream->info; // order = getTableScanOrder(pScanInfo); // } @@ -6429,7 +6422,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); +// toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -6469,7 +6462,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -6488,7 +6481,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -6529,7 +6522,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6743,7 +6736,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6781,7 +6774,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6802,7 +6795,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6841,7 +6834,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6860,7 +6853,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6904,8 +6897,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); } - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -7049,17 +7041,32 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } -static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows) { +static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, const STableGroupInfo* pTableGroupInfo) { pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.capacity = 4096; - pInfo->pResultRowHashTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pInfo->pResultRowListSet = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + + pInfo->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); + pInfo->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); - // TODO: number of tables - pInfo->pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); + pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); + + int32_t index = 0; + for(int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { + SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); + for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) { + STableKeyInfo* pk = taosArrayGet(pa, j); + + STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index]; + pTQueryInfo->uid = pk->uid; + pTQueryInfo->lastKey = pk->lastKey; + pTQueryInfo->groupIndex = i; + } + } STimeWindow win = {0, INT64_MAX}; createTableQueryInfo(pInfo->pTableQueryInfo, false, win); @@ -7078,13 +7085,13 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) { return p; } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); int32_t numOfRows = 1; //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); - initAggInfo(pInfo, pExprInfo, numOfRows); + initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo); size_t numOfOutput = taosArrayGetSize(pExprInfo); @@ -7181,15 +7188,15 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - size_t tableGroup = 1; // GET_NUM_OF_TABLEGROUP(pRuntimeEnv); int32_t numOfRows = 1; size_t numOfOutput = taosArrayGetSize(pExprInfo); - initAggInfo(pInfo, pExprInfo, numOfRows); - SExprInfo* p = exprArrayDup(pExprInfo); - // initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); + initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); + + size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList); + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableAggregate"; @@ -7197,7 +7204,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->pExpr = p; + pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; pOperator->exec = doMultiTableAggregate; @@ -8045,14 +8052,15 @@ static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHa static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); -SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { +SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { - if (pPhyNode->info.type == OP_DataBlocksOptScan) { + if (pPhyNode->info.type == OP_TableScan) { SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; - size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhyNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (pPhyNode->info.type == OP_Exchange) { SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; @@ -8086,10 +8094,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask size_t size = taosArrayGetSize(pPhyNode->pChildren); assert(size == 1); + // TODO single table agg for (int32_t i = 0; i < size; ++i) { SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId); - return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); + } + } else if (pPhyNode->info.type == OP_MultiTableAggregate) { + size_t size = taosArrayGetSize(pPhyNode->pChildren); + assert(size == 1); + + for (int32_t i = 0; i < size; ++i) { + SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); } } } @@ -8164,7 +8182,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId); + STableGroupInfo group = {0}; + (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); if ((*pTaskInfo)->pRoot == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 20e8378c26..eadc95b98d 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -216,7 +216,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } else if (needSeqScan(pPlanNode)) { return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); } - int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; + int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan; return createUserTableScanNode(pPlanNode, pTable, type); } @@ -288,7 +288,7 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) { SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList; vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode); - int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; + int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan; return createUserTableScanNode(pPlanNode, pTableInfo, type); } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index e367f2e74b..b2109c0a4f 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -88,7 +88,7 @@ static const char* jkPnodeType = "Type"; static int32_t getPnodeTypeSize(cJSON* json) { switch (getNumber(json, jkPnodeType)) { case OP_StreamScan: - case OP_DataBlocksOptScan: + case OP_TableScan: case OP_TableSeqScan: return sizeof(STableScanPhyNode); case OP_TagScan: @@ -830,7 +830,7 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) { const SPhyNode* phyNode = (const SPhyNode*)obj; switch (phyNode->info.type) { case OP_StreamScan: - case OP_DataBlocksOptScan: + case OP_TableScan: case OP_TableSeqScan: return tableScanNodeToJson(obj, json); case OP_TagScan: @@ -868,7 +868,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { SPhyNode* phyNode = (SPhyNode*)obj; switch (phyNode->info.type) { case OP_StreamScan: - case OP_DataBlocksOptScan: + case OP_TableScan: case OP_TableSeqScan: return tableScanNodeFromJson(json, obj); case OP_TagScan: