opti:build task logic in stream

This commit is contained in:
wangmm0220 2023-12-08 15:51:01 +08:00
parent b36665c9d6
commit acfcfdc8b6
1 changed files with 2 additions and 2 deletions

View File

@ -559,7 +559,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
if (numOfPlanLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
// add extra sink
hasExtraSink = true;
int32_t code = addSinkTask(pMnode, pStream, pEpset);
@ -596,7 +596,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
bindAggSink(pStream, pMnode, pStream->pHTasksList);
}
plan = getScanSubPlan(pPlan, 1);
plan = getScanSubPlan(pPlan, numOfPlanLevel - 1);
if(plan == NULL){
return terrno;
}