fix(stream): dispatch
This commit is contained in:
parent
3fa5791889
commit
57f9d8b0b0
|
@ -118,12 +118,12 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
||||||
bool isShuffle = false;
|
bool isShuffle = false;
|
||||||
|
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
if (pDb->cfg.numOfVgroups > 1) {
|
if (pDb->cfg.numOfVgroups > 1) {
|
||||||
isShuffle = true;
|
isShuffle = true;
|
||||||
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -280,7 +280,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
taosArrayDestroy(req.dataLen);
|
taosArrayDestroy(req.dataLen);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
} else if (pTask->dispatchMsgType == TASK_DISPATCH__SHUFFLE) {
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0);
|
ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0);
|
||||||
int32_t vgSz = taosArrayGetSize(vgInfo);
|
int32_t vgSz = taosArrayGetSize(vgInfo);
|
||||||
|
@ -318,6 +318,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
|
||||||
goto FAIL_SHUFFLE_DISPATCH;
|
goto FAIL_SHUFFLE_DISPATCH;
|
||||||
}
|
}
|
||||||
|
pReqs[j].taskId = pVgInfo->taskId;
|
||||||
|
pReqs[j].blockNum++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -342,6 +344,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
taosMemoryFree(pReqs);
|
taosMemoryFree(pReqs);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue