From ce4153b6fcb125059c5d96a5ba364a0732ba90e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 Jul 2024 09:05:33 +0800 Subject: [PATCH] fix(stream): use hb to send the consens-checkpointid req. --- include/libs/stream/tstream.h | 18 ++++++----- source/dnode/mnode/impl/inc/mndStream.h | 3 +- source/dnode/mnode/impl/src/mndStream.c | 6 ++-- source/dnode/mnode/impl/src/mndStreamHb.c | 35 +++++++++++++++++++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 15 +++++++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +-- source/libs/stream/src/streamCheckpoint.c | 4 ++- source/libs/stream/src/streamHb.c | 6 ++++ source/libs/stream/src/streamMeta.c | 3 +- source/libs/stream/src/streamMsg.c | 2 ++ source/libs/stream/src/streamTask.c | 1 + 11 files changed, 76 insertions(+), 22 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d0feebf814..5c61265c01 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -289,6 +289,7 @@ typedef struct SStreamStatus { bool appendTranstateBlock; // has append the transfer state data block already bool removeBackendFiles; // remove backend files on disk when free stream tasks bool sendConsensusChkptId; + bool requireConsensusChkptId; } SStreamStatus; typedef struct SDataRange { @@ -568,14 +569,15 @@ typedef struct { } SStreamScanHistoryReq; typedef struct STaskCkptInfo { - int64_t latestId; // saved checkpoint id - int64_t latestVer; // saved checkpoint ver - int64_t latestTime; // latest checkpoint time - int64_t latestSize; // latest checkpoint size - int8_t remoteBackup; // latest checkpoint backup done - int64_t activeId; // current active checkpoint id - int32_t activeTransId; // checkpoint trans id - int8_t failed; // denote if the checkpoint is failed or not + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t latestSize; // latest checkpoint size + int8_t remoteBackup; // latest checkpoint backup done + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not + int8_t consensusChkptId; // required the consensus-checkpointId } STaskCkptInfo; typedef struct STaskStatusEntry { diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 7b4270462f..0b6b6a9ef2 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -147,8 +147,7 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks); -void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, - SRpcHandleInfo *pRpcInfo); +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 00909cb36d..cd99714395 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2718,7 +2718,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { mDebug("not all(%d/%d) task(s) send hbMsg yet, wait for a while and check again, s-task:0x%x", req.taskId, num, numOfTasks); SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); - mndAddConsensusTasks(pInfo, &req, &pMsg->info); + mndAddConsensusTasks(pInfo, &req); taosThreadMutexUnlock(&execInfo.lock); doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); @@ -2737,7 +2737,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { // wait for 5s and check again SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); - mndAddConsensusTasks(pInfo, &req, &pMsg->info); + mndAddConsensusTasks(pInfo, &req); if (pStream != NULL) { mndReleaseStream(pMnode, pStream); @@ -2789,7 +2789,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { taosArrayPush(pList, &pe->req.taskId); streamId = pe->req.streamId; } else { - mDebug("s-task:0x%x sendTs:%" PRId64 " wait %2.fs already, wait for next round to check", pe->req.taskId, + mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs already, wait for next round to check", pe->req.taskId, (now - pe->ts) / 1000.0, pe->ts); } } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index c7f97b4a62..8a374c99ef 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -246,7 +246,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); + mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); @@ -284,6 +284,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } + STaskCkptInfo *pChkInfo = &p->checkpointInfo; + if (pChkInfo->consensusChkptId != 0) { + SRestoreCheckpointInfo cp = { + .streamId = p->id.streamId, + .taskId = p->id.taskId, + .checkpointId = p->checkpointInfo.latestId, + .startTs = pTaskEntry->startTime, + }; + + SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId); + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks); + mndAddConsensusTasks(pInfo, &cp); + mndReleaseStream(pMnode, pStream); + } + if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); if (pTaskEntry->nodeId == SNODE_HANDLE) { @@ -292,7 +309,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } else { streamTaskStatusCopy(pTaskEntry, p); - STaskCkptInfo *pChkInfo = &p->checkpointInfo; if ((pChkInfo->activeId != 0) && pChkInfo->failed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, pChkInfo->activeId, pChkInfo->activeTransId); @@ -304,6 +320,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // remove failed trans from pChkptStreams taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); } + +/* if (pChkInfo->consensusChkptId != 0) { + SRestoreCheckpointInfo cp = { + .streamId = p->id.streamId, + .taskId = p->id.taskId, + .checkpointId = p->checkpointInfo.latestId, + .startTs = pTaskEntry->startTime, + }; + + SStreamObj* pStream = mndGetStreamObj(pMnode, p->id.streamId); + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks); + mndAddConsensusTasks(pInfo, &cp, NULL); + mndReleaseStream(pMnode, pStream); + }*/ } if (p->status == pTaskEntry->status) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 7d9b9e4571..c4797957c2 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -953,12 +953,21 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, // no matter existed or not, add the request into info list anyway, since we need to send rsp mannually // discard the msg may lead to the lost of connections. -void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcHandleInfo* pRpcInfo) { - SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs(), .rspInfo = *pRpcInfo}; +void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo) { + SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs()}; memcpy(&info.req, pRestoreInfo, sizeof(info.req)); - taosArrayPush(pInfo->pTaskList, &info); + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { + SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); + if (p->req.taskId == info.req.taskId) { + mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 + ", ignore this, total existed:%d", + pRestoreInfo->taskId, pRestoreInfo->streamId, (int32_t)taosArrayGetSize(pInfo->pTaskList)); + return; + } + } + taosArrayPush(pInfo->pTaskList, &info); int32_t num = taosArrayGetSize(pInfo->pTaskList); mDebug("s-task:0x%x added into consensus-checkpointId list, stream:0x%" PRIx64 " total waiting:%d", pRestoreInfo->taskId, pRestoreInfo->streamId, num); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cb480d09bb..dc55acbb5c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -742,6 +742,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); + pMeta->startInfo.restartCount = 0; streamMetaWUnLock(pMeta); tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId); } @@ -1160,7 +1161,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return TSDB_CODE_SUCCESS; } - +#if 0 // discard the rsp, since it is expired. if (req.startTs < pTask->execInfo.created) { tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 @@ -1170,7 +1171,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } - +#endif tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode", pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5cd084e6a2..cdb5bf0b50 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1121,7 +1121,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { taosThreadMutexUnlock(&pTask->lock); ASSERT(pTask->pBackend == NULL); - + pTask->status.requireConsensusChkptId = true; +#if 0 SRestoreCheckpointInfo req = { .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, @@ -1158,6 +1159,7 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { pInfo->checkpointId); tmsgSendReq(&pTask->info.mnodeEpset, &msg); +#endif return 0; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 691ec44672..6b1a1aca92 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -194,6 +194,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } } + if ((*pTask)->status.requireConsensusChkptId) { + entry.checkpointInfo.consensusChkptId = 1; + (*pTask)->status.requireConsensusChkptId = false; + stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId); + } + if ((*pTask)->exec.pWalReader != NULL) { entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; if (entry.processedVer < 0) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 19cb2f7854..e7fdb7ae2a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1031,10 +1031,11 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) { taosHashClear(pStartInfo->pFailedTaskSet); pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; + pStartInfo->elapsedTime = 0; // reset the sentinel flag value to be 0 pStartInfo->startAllTasks = 0; - stDebug("vgId:%d clear all start-all-task info", vgId); + stDebug("vgId:%d clear start-all-task info", vgId); } void streamMetaRLock(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index f02f661143..f10296f6ff 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -349,6 +349,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId) < 0) return -1; if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; @@ -403,6 +404,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1; if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1decfe198a..5506ed2d45 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -848,6 +848,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { .checkpointInfo.latestTime = pTask->chkInfo.checkpointTime, .checkpointInfo.latestSize = 0, .checkpointInfo.remoteBackup = 0, + .checkpointInfo.consensusChkptId = 0, .hTaskId = pTask->hTaskInfo.id.taskId, .procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize), .outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),