From 0ea46585f444f429aab66cb89424bc9c416d6ff8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 16:29:00 +0800 Subject: [PATCH] fix(stream): update checkpoint-info after check the failed checkpointId, and update the consensus-checkpoint id in mnode. --- source/dnode/mnode/impl/inc/mndStream.h | 4 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 17 ++++++++++------- source/libs/stream/src/streamCheckpoint.c | 13 ++++++++++++- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index ec04aa3111..857fd5c99c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -175,8 +175,8 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); -int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); -int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); +int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId); +int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index e3b8289e4c..d896434f3b 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -902,7 +902,8 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()}; memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + int32_t num = (int32_t) taosArrayGetSize(pInfo->pTaskList); + for (int32_t i = 0; i < num; ++i) { SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); if (p == NULL) { continue; @@ -910,10 +911,12 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo if (p->req.taskId == info.req.taskId) { mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 - "->%" PRId64 " total existed:%d", - pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, - (int32_t)taosArrayGetSize(pInfo->pTaskList)); + "->%" PRId64 " checkpointId:%" PRId64 " -> %" PRId64 " total existed:%d", + pRestoreInfo->taskId, pRestoreInfo->streamId, p->req.startTs, info.req.startTs, p->req.checkpointId, + info.req.checkpointId, num); p->req.startTs = info.req.startTs; + p->req.checkpointId = info.req.checkpointId; + p->req.transId = info.req.transId; return; } } @@ -922,7 +925,7 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo if (p == NULL) { mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId); } else { - int32_t num = taosArrayGetSize(pInfo->pTaskList); + num = taosArrayGetSize(pInfo->pTaskList); mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 " waiting tasks:%d", pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); @@ -934,7 +937,7 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo) { pInfo->pTaskList = NULL; } -int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { +int32_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { @@ -951,7 +954,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj *pHash, int64_t streamId) { return code; } -int64_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { +int32_t mndClearChkptReportInfo(SHashObj *pHash, int64_t streamId) { int32_t code = 0; int32_t numOfStreams = taosHashGetSize(pHash); if (numOfStreams == 0) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d3eba382c9..0765018c9c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -604,6 +604,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV streamMutexLock(&pTask->lock); + // not update the checkpoint info if the checkpointId is less than the failed checkpointId + if (pReq->checkpointId < pInfo->pActiveInfo->failedId) { + stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64 + " is less than the failed checkpointId:%" PRId64 ", discard the update info", + id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId); + streamMutexUnlock(&pTask->lock); + + // always return true + return TSDB_CODE_SUCCESS; + } + if (pReq->checkpointId <= pInfo->checkpointId) { stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored", @@ -638,7 +649,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) { - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);