diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index beb5e1152f..10381f72fd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2522,8 +2522,8 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray } int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, - int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, - SArray* pColList) { + int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, + SArray* pColList) { if (pColList == NULL) { // data from other sources blockCompressDecode(pRes, numOfOutput, numOfRows, pData); } else { // extract data according to pColList @@ -2677,7 +2677,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); + pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); goto _error; @@ -2790,7 +2790,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; int32_t code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); + pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 @@ -2856,7 +2856,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { return seqLoadRemoteData(pOperator); } else { return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); -// return concurrentlyLoadRemoteData(pOperator); + // return concurrentlyLoadRemoteData(pOperator); } } @@ -2911,18 +2911,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode goto _error; } - pInfo->seqLoadData = false; + pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; - pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pInfo->pResult->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pInfo->pResult->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); @@ -4389,7 +4389,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STimeWindowAggSupp twSup = { - .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; + .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); @@ -4517,17 +4517,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode; SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SSDataBlock* pResBlock = createResDataBlock(pDescNode); - SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys); + SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - SArray* pColList = NULL; - //extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); - + SArray* pColList = + extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b2a675dbb1..991d6d66b8 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1191,6 +1191,9 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets, &pMerge->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc); + } if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pMerge;