diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d6fb67d30a..3ef3347547 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -390,6 +390,7 @@ typedef struct SStreamMeta { tmr_h hbTmr; SMgmtInfo mgmtInfo; + int32_t closedTask; int32_t chkptNotReadyTasks; int64_t chkpId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 38453ee81a..42ec7d320b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1855,13 +1855,12 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { + bool restartTasks = false; SStreamMeta* pMeta = pTq->pStreamMeta; - - int32_t vgId = TD_VID(pTq->pVnode); - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int32_t vgId = TD_VID(pTq->pVnode); + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SStreamTaskNodeUpdateMsg req = {0}; @@ -1884,34 +1883,52 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamTaskStop(pTask); - SStreamTask* pHistoryTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHistoryTask != NULL) { - tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); - streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList); + taosWLockLatch(&pMeta->lock); - streamTaskRestart(pHistoryTask, NULL, startTask); - streamMetaReleaseTask(pMeta, pHistoryTask); - } else { - tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped", - pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); - } - } + int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); + pMeta->closedTask += 1; - streamTaskRestart(pTask, NULL, startTask); + tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); streamMetaReleaseTask(pMeta, pTask); - int32_t code = rsp.code; + // all tasks are closed, now let's restart the stream meta + if (pMeta->closedTask == numOfCount) { + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + restartTasks = true; + pMeta->closedTask = 0; // reset value + } + + taosWUnLockLatch(&pMeta->lock); _end: tDecoderClear(&decoder); tmsgSendRsp(&rsp); - tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); - return code; + if (restartTasks) { + tqDebug("vgId:%d all tasks are stopped, restart them", vgId); + + streamMetaClose(pTask->pMeta); + + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1); + if (pTq->pStreamMeta == NULL) { + return -1; + } + + if (streamLoadTasks(pTq->pStreamMeta) < 0) { + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + vInfo("vgId:%d, restart to all stream tasks", vgId); + tqCheckStreamStatus(pTq); + } + } + + return rsp.code; } int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {