From ab2aad2b2cff59fb0d2eb4f436bf5c7914ded47e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Aug 2023 15:32:55 +0800 Subject: [PATCH] fix(stream): update the task status even it is in stop status. --- source/dnode/vnode/src/tq/tq.c | 50 ++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 044b353ddd..dac5838db0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1821,7 +1821,6 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { - bool restartTasks = false; SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = TD_VID(pTq->pVnode); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -1838,46 +1837,49 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { goto _end; } - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); - if (pTask == NULL) { + // update the nodeEpset when it exists + taosWLockLatch(&pMeta->lock); + + // when replay the WAL, we should update the task epset one again and again, the task may be in stop status. + int64_t keys[2] = {req.streamId, req.taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + + if (*ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active rsp.code = TSDB_CODE_SUCCESS; + taosWUnLockLatch(&pMeta->lock); goto _end; } + SStreamTask* pTask = *ppTask; + tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + { - taosWLockLatch(&pMeta->lock); streamSetStatusNormal(pTask); streamMetaSaveTask(pMeta, pTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk } - taosWUnLockLatch(&pMeta->lock); } + streamTaskStop(pTask); - - taosWLockLatch(&pMeta->lock); - - int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); - pMeta->closedTask += 1; - tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); - streamMetaReleaseTask(pMeta, pTask); - // all tasks are closed, now let's restart the stream meta - if (pMeta->closedTask == numOfCount) { - tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId); - // if (streamMetaCommit(pMeta) < 0) { - // persist to disk - // } - restartTasks = true; - pMeta->closedTask = 0; // reset value - } else { - tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfCount - pMeta->closedTask)); + bool allStopped = true; + int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); + for(int32_t i = 0; i < numOfCount; ++i) { + SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + + int64_t keys1[2] = {pId->streamId, pId->taskId}; + SStreamTask* p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1)); + if (p->status.taskStatus != TASK_STATUS__STOP) { + allStopped = false; + tqDebug("vgId:%d, s-task:0x%"PRIx64"-0x%x not updated yet", vgId, keys1[0], pId->taskId); + break; + } } taosWUnLockLatch(&pMeta->lock); @@ -1885,7 +1887,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { _end: tDecoderClear(&decoder); - if (restartTasks) { + if (allStopped) { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); } else {