fix(stream): follower nodes not restart tasks.

This commit is contained in:
Haojun Liao 2025-02-23 10:43:56 +08:00
parent e60321196b
commit 3e55f8edfe
4 changed files with 24 additions and 14 deletions

View File

@ -19,7 +19,7 @@
// message process
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored);
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader);
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);

View File

@ -157,7 +157,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DROP:
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE:
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_PAUSE:

View File

@ -1364,7 +1364,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored);
return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg,
pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER));
}
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -139,7 +139,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
}
// this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
@ -298,14 +298,19 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
if (restored) {
if (restored && isLeader) {
tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
pMeta->startInfo.tasksWillRestart = 1;
}
if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
if (isLeader) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
} else {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks,
(numOfTasks - updateTasks));
}
} else {
if ((code = streamMetaCommit(pMeta)) < 0) {
// always return true
@ -316,17 +321,21 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaClearSetUpdateTaskListComplete(pMeta);
if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
} else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
if (isLeader) {
if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
} else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
#if 0
taosMSleep(5000);// for test purpose, to trigger the leader election
#endif
code = tqStreamTaskStartAsync(pMeta, cb, true);
if (code) {
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
code = tqStreamTaskStartAsync(pMeta, cb, true);
if (code) {
tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code));
}
}
} else {
tqDebug("vgId:%d follower nodes not restart tasks", vgId);
}
}