From 767b768e5137e29bc1c14a850d52abbe13b16cad Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 11 Jun 2022 15:03:23 +0800 Subject: [PATCH] fix(stream): fix shuffle vg id not initialized --- source/dnode/mnode/impl/src/mndScheduler.c | 54 ++++++++++++---------- source/libs/stream/src/streamDispatch.c | 6 ++- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c04e416896..a4ddf96630 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* ASSERT(pDb); if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { - sdbRelease(pMnode->pSdb, pDb); + ASSERT(0); + return -1; + } + sdbRelease(pMnode->pSdb, pDb); - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->taskId; - break; - } + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + ASSERT(pVgInfo->taskId != 0); + break; } } - } else { - pTask->dispatchType = TASK_DISPATCH__FIXED; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); - // one sink only - ASSERT(taosArrayGetSize(pArray) == 1); - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } return 0; } @@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; // dispatch - mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask); + if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) { + qDestroyQueryPlan(pPlan); + return -1; + } // exec pFinalTask->execType = TASK_EXEC__PIPE; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6fb5d75d86..d523374638 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM .upstreamNodeId = pTask->nodeId, .blockNum = blockNum, }; - qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId); req.data = taosArrayInit(blockNum, sizeof(void*)); req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); @@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM break; } } - ASSERT(vgId != 0); } + ASSERT(vgId != 0); req.taskId = downstreamTaskId; + qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId, + downstreamTaskId, vgId); + // serialize int32_t tlen; tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);