diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3e24e43233..ab60acab53 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -821,7 +821,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 13ac74199e..e4c072a396 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4558,7 +4558,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond); -static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); @@ -4725,12 +4724,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else { qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); } - SArray* tableIdList = extractTableIdList(pTableListInfo); + + SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys); + int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKyes); //todo for json + if (code){ + tsdbCleanupReadHandle(pDataReader); + return NULL; + } SOperatorInfo* pOperator = - createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo, &twSup); + createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup); - taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; @@ -5183,18 +5187,6 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa return code; } -SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { - SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); - - // Transfer the Array of STableKeyInfo into uid list. - for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) { - STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i); - taosArrayPush(tableIdList, &pkeyInfo->uid); - } - - return tableIdList; -} - tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { int32_t code = diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b6ee9843b4..d30e4ef6db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -886,6 +886,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pInfo->pRes->info.groupId = groupId; } + uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); + if (groupIdPre) { + pInfo->pRes->info.groupId = *groupIdPre; + } + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); if (!pColMatchInfo->output) { @@ -953,11 +958,24 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } -SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, +static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { + SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); + + // Transfer the Array of STableKeyInfo into uid list. + for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) { + STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i); + taosArrayPush(tableIdList, &pkeyInfo->uid); + } + + return tableIdList; +} + +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _error; @@ -988,10 +1006,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan // set the extract column id to streamHandle tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds); - int32_t code = tqReadHandleSetTbUidList(pHandle->reader, pTableIdList); + SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); + int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); if (code != 0) { + taosArrayDestroy(tableIdList); goto _error; } + taosArrayDestroy(tableIdList); pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); if (pInfo->pBlockLists == NULL) {