fix(stream): follower nodes not restart tasks.

This commit is contained in:
Haojun Liao 2025-02-23 10:43:56 +08:00
parent 9018970135
commit f1c4d2734f
5 changed files with 27 additions and 15 deletions

View File

@ -19,7 +19,7 @@
// message process // message process
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId); int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader);
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);

View File

@ -157,7 +157,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DROP: case TDMT_STREAM_TASK_DROP:
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE:
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
case TDMT_VND_STREAM_TASK_RESET: case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_PAUSE: case TDMT_STREAM_TASK_PAUSE:

View File

@ -1364,7 +1364,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg,
pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER));
} }
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -139,7 +139,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
} }
// this is to process request from transaction, always return true. // this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
@ -298,14 +298,19 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
if (restored) { if (restored && isLeader) {
tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId); tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
pMeta->startInfo.tasksWillRestart = 1; pMeta->startInfo.tasksWillRestart = 1;
} }
if (updateTasks < numOfTasks) { if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, if (isLeader) {
updateTasks, (numOfTasks - updateTasks)); tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
} else {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
(numOfTasks - updateTasks));
}
} else { } else {
if ((code = streamMetaCommit(pMeta)) < 0) { if ((code = streamMetaCommit(pMeta)) < 0) {
// always return true // always return true
@ -316,17 +321,21 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaClearSetUpdateTaskListComplete(pMeta); streamMetaClearSetUpdateTaskListComplete(pMeta);
if (!restored) { if (isLeader) {
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); if (!restored) {
} else { tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); } else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
#if 0 #if 0
taosMSleep(5000);// for test purpose, to trigger the leader election taosMSleep(5000);// for test purpose, to trigger the leader election
#endif #endif
code = tqStreamTaskStartAsync(pMeta, cb, true); code = tqStreamTaskStartAsync(pMeta, cb, true);
if (code) { if (code) {
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code)); tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
}
} }
} else {
tqDebug("vgId:%d follower nodes not restart tasks", vgId);
} }
} }

View File

@ -931,7 +931,9 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg); vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
if (!syncIsReadyForRead(pVnode->sync)) { if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno); vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0; return 0;
} }