From 6c7a8e97743424900d976bf8d5e98e12c8f055c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 10 Feb 2025 16:08:18 +0800 Subject: [PATCH] refactor(stream): allow ready stream to start checkpoint procedure. --- source/dnode/mnode/impl/src/mndStream.c | 64 ++++++++++--------------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e78996c231..1089fa1bd3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1158,51 +1158,22 @@ int32_t extractStreamNodeList(SMnode *pMnode) { } static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { - bool ready = true; + int32_t code = 0; if (mndStreamNodeIsUpdated(pMnode)) { - TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } streamMutexLock(&execInfo.lock); if (taosArrayGetSize(execInfo.pNodeList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); if (taosArrayGetSize(execInfo.pTaskList) != 0) { - streamMutexUnlock(&execInfo.lock); mError("stream task node change checking done, no vgroups exist, but task list is not empty"); - return TSDB_CODE_FAILED; - } - } - - for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); - if (p == NULL) { - continue; - } - - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); - if (pEntry == NULL) { - continue; - } - - if (pEntry->status != TASK_STATUS__READY) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, - (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); - ready = false; - break; - } - - if (pEntry->hTaskId != 0) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64 - " exists, checkpoint not issued", - pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status), - pEntry->hTaskId); - ready = false; - break; + code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; } } streamMutexUnlock(&execInfo.lock); - return ready ? 0 : -1; + return code; } int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { @@ -1216,7 +1187,22 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { continue; } - if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) { + // -1 denote not ready now or never ready till now + if (pEntry->hTaskId != 0) { + mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64 + " exists, checkpoint not issued", + pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status), + pEntry->hTaskId); + return -1; + } + + if (pEntry->status != TASK_STATUS__READY) { + mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId, + (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); + return -1; + } + + if (ts < pEntry->startTime) { ts = pEntry->startTime; taskId = pEntry->id.taskId; } @@ -1249,11 +1235,11 @@ static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) { int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) { + if (lastReadyTs != -1) { - mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold", - pStream->uid, lastReadyTs, now - lastReadyTs); - } else { - mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid); + mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64 + "ms less than threshold", + pStream->uid, lastReadyTs, (now - lastReadyTs)); } ready = false; @@ -1274,7 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { - TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));