fix(stream): update the task status even it is in stop status.

This commit is contained in:
Haojun Liao 2023-08-25 15:32:55 +08:00
parent 4c7ad54c9c
commit ab2aad2b2c
1 changed files with 26 additions and 24 deletions

View File

@ -1821,7 +1821,6 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
bool restartTasks = false;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -1838,46 +1837,49 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
goto _end; goto _end;
} }
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); // update the nodeEpset when it exists
if (pTask == NULL) { taosWLockLatch(&pMeta->lock);
// when replay the WAL, we should update the task epset one again and again, the task may be in stop status.
int64_t keys[2] = {req.streamId, req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (*ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId); req.taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock);
goto _end; goto _end;
} }
SStreamTask* pTask = *ppTask;
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
{ {
taosWLockLatch(&pMeta->lock);
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock);
} }
streamTaskStop(pTask); streamTaskStop(pTask);
taosWLockLatch(&pMeta->lock);
int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
pMeta->closedTask += 1;
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
// all tasks are closed, now let's restart the stream meta bool allStopped = true;
if (pMeta->closedTask == numOfCount) { int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId); for(int32_t i = 0; i < numOfCount; ++i) {
// if (streamMetaCommit(pMeta) < 0) { SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
// persist to disk
// } int64_t keys1[2] = {pId->streamId, pId->taskId};
restartTasks = true; SStreamTask* p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1));
pMeta->closedTask = 0; // reset value if (p->status.taskStatus != TASK_STATUS__STOP) {
} else { allStopped = false;
tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfCount - pMeta->closedTask)); tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId);
break;
}
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
@ -1885,7 +1887,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
_end: _end:
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (restartTasks) { if (allStopped) {
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
} else { } else {