diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7bf3371006..b1b190c816 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5372,7 +5372,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } -SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5382,16 +5382,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp return NULL; } - // todo dynamic set the value of 4096 - pInfo->pRes = createOutputBuf_rv(pExprInfo, 4096); - - int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo); - SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); - for(int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); - } - // set the extract column id to streamHandle tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); @@ -5401,15 +5391,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp return NULL; } - pInfo->readerHandle = streamReadHandle; + pInfo->readerHandle = streamReadHandle; + pInfo->pRes = pResBlock; pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; - pOperator->nextDataFn = doStreamBlockScan; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->nextDataFn = doStreamBlockScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -8097,6 +8088,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); +static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); +static SArray* extractScanColumnId(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node @@ -8119,30 +8112,19 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. + STableGroupInfo groupInfo = {0}; int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); - SArray* idList = NULL; + SArray* tableIdList = extractTableIdList(&groupInfo); - if (groupInfo.numOfTables > 0) { - SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0); - ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1); + SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); + SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); - // Transfer the Array of STableKeyInfo into uid list. - size_t numOfTables = taosArrayGetSize(pa); - idList = taosArrayInit(numOfTables, sizeof(uint64_t)); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo); - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pkeyInfo = taosArrayGet(pa, i); - taosArrayPush(idList, &pkeyInfo->uid); - } - } else { - idList = taosArrayInit(4, sizeof(uint64_t)); - } - -// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo); - taosArrayDestroy(idList); -// return pOperator; + taosArrayDestroy(tableIdList); + return pOperator; } } @@ -8203,7 +8185,24 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa return tsdbQueryTables(readHandle, &cond, pGroupInfo, queryId, taskId); } -static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { +SArray* extractScanColumnId(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(int16_t)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); + SColumnNode* pColNode = (SColumnNode*) pNode->pExpr; + taosArrayPush(pList, &pColNode->colId); + } + + return pList; +} + +int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId); @@ -8214,7 +8213,25 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t return code; } -static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { +SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { + SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); + + if (pTableGroupInfo->numOfTables > 0) { + SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, 0); + ASSERT(taosArrayGetSize(pTableGroupInfo->pGroupList) == 1); + + // Transfer the Array of STableKeyInfo into uid list. + size_t numOfTables = taosArrayGetSize(pa); + for (int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pkeyInfo = taosArrayGet(pa, i); + taosArrayPush(tableIdList, &pkeyInfo->uid); + } + } + + return tableIdList; +} + +tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { STableGroupInfo groupInfo = {0}; uint64_t uid = pTableScanNode->scan.uid;