From c35c634977390d64165a9f4bd03b0ccfc1855f8a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Jul 2024 17:08:32 +0800 Subject: [PATCH] fix(stream): add flag to disable the concurrently started consensus-checkpointId procedure. --- include/libs/stream/tstream.h | 4 ++-- source/libs/stream/src/streamCheckpoint.c | 11 +++++++++++ source/libs/stream/src/streamMeta.c | 23 +++++++++++++++-------- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f24f7da7c3..093a21c999 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,9 +272,8 @@ typedef struct SCheckpointInfo { int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it - + int64_t msgVer; SActiveCheckpointInfo* pActiveInfo; - int64_t msgVer; } SCheckpointInfo; typedef struct SStreamStatus { @@ -289,6 +288,7 @@ typedef struct SStreamStatus { int32_t inScanHistorySentinel; bool appendTranstateBlock; // has append the transfer state data block already bool removeBackendFiles; // remove backend files on disk when free stream tasks + bool sendConsensusChkptId; } SStreamStatus; typedef struct SDataRange { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 969c2e1795..0f39ca7213 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1107,6 +1107,17 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; + taosThreadMutexLock(&pTask->lock); + if (pTask->status.sendConsensusChkptId == true) { + stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); + taosThreadMutexUnlock(&pTask->lock); + return TSDB_CODE_SUCCESS; + } else { + pTask->status.sendConsensusChkptId = true; + } + + taosThreadMutexUnlock(&pTask->lock); + ASSERT(pTask->pBackend == NULL); SRestoreCheckpointInfo req = { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7b94f642e2..19cb2f7854 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1198,7 +1198,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { continue; } - if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) { + if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) { code = pMeta->expandTaskFn(pTask); if (code != TSDB_CODE_SUCCESS) { stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); @@ -1392,17 +1392,24 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 streamMetaWLock(pMeta); - if (pStartInfo->startAllTasks != 1) { - int64_t el = endTs - startTs; - stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", - pMeta->vgId, taskId, ready, el); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p == NULL) { // task does not exists in current vnode, not record the complete info + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); return 0; } - void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (p == NULL) { // task does not exists in current vnode, not record the complete info - stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + // clear the send consensus-checkpointId flag + taosThreadMutexLock(&(*p)->lock); + (*p)->status.sendConsensusChkptId = false; + taosThreadMutexUnlock(&(*p)->lock); + + if (pStartInfo->startAllTasks != 1) { + int64_t el = endTs - startTs; + stDebug( + "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " + "time:%" PRId64 "ms", + pMeta->vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; }