diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 34c96a5ec4..9882b0a9ae 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -118,12 +118,12 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { - pTask->dispatchType = TASK_DISPATCH__SHUFFLE; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDb); if (pDb->cfg.numOfVgroups > 1) { isShuffle = true; + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { ASSERT(0); return -1; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ecad1c96e0..2e33632f12 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -280,7 +280,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosArrayDestroy(req.dataLen); return code; - } else if (pTask->dispatchMsgType == TASK_DISPATCH__SHUFFLE) { + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0); int32_t vgSz = taosArrayGetSize(vgInfo); @@ -318,6 +318,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } + pReqs[j].taskId = pVgInfo->taskId; + pReqs[j].blockNum++; break; } } @@ -342,6 +344,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosMemoryFree(pReqs); } return code; + } else { + ASSERT(0); } return 0; }