Merge pull request #13718 from taosdata/feature/stream

fix(stream): fix shuffle vg id not initialized
This commit is contained in:
Liu Jicong 2022-06-11 15:21:13 +08:00 committed by GitHub
commit 824c3fbead
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 16 deletions

View File

@ -151,6 +151,9 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
ASSERT(0);
return -1;
}
sdbRelease(pMnode->pSdb, pDb);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
@ -163,11 +166,11 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
ASSERT(pVgInfo->taskId != 0);
break;
}
}
}
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// dispatch
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask);
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) {
qDestroyQueryPlan(pPlan);
return -1;
}
// exec
pFinalTask->execType = TASK_EXEC__PIPE;

View File

@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId);
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
break;
}
}
ASSERT(vgId != 0);
}
ASSERT(vgId != 0);
req.taskId = downstreamTaskId;
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
downstreamTaskId, vgId);
// serialize
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);