fix(stream): disable the follower to execute the stop/resume instruction.

This commit is contained in:
Haojun Liao 2023-08-25 19:20:49 +08:00
parent ef3e2737f4
commit 7ceb8dc627
2 changed files with 5 additions and 3 deletions

View File

@ -2289,7 +2289,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status));
mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status));
}
}
taosThreadMutexUnlock(&execNodeList.lock);

View File

@ -570,12 +570,14 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
} break;
case TDMT_STREAM_TASK_PAUSE: {
if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_RESUME: {
if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;