From 03fe3e5430b486abb72851036f7be95308e775b5 Mon Sep 17 00:00:00 2001 From: dongming chen Date: Mon, 17 Mar 2025 17:13:54 +0800 Subject: [PATCH] fix: [TD-34000] remove lock when send heartbeat reply main (#30123) * fix(stream): reduce the consensus checkpoint id trans. * refactor(stream): add some logs. * refactor(stream): set the max checkpoint exec time 30min. * refactor(stream): add checkpoint-consensus trans conflict check. * refactor(stream): remove unused local variables. * fix(stream): fix syntax error. * fix(stream): 1. fix free memory error 2. continue if put result into dst hashmap failed. * fix issue * fix issue * fix(mnd): follower mnode not processes the timer event. * fix(stream): print correct error msg. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): truncate long subtable name * fix(stream): add buffer len. * refactor(stream): update some logs. * fix issue * refactor(stream): update some logs. * refactor(stream): update some logs. * fix(stream): check return value. * fix(stream): fix syntax error. * fix(stream): check return value. * fix(stream): update the timer check in mnode. * fix: TD-34000-remove-lock-when-send-heartbeat-reply * fix: TD-34000-remove-lock-when-send-heartbeat-reply --------- Co-authored-by: Haojun Liao Co-authored-by: 54liuyao <54liuyao@163.com> Co-authored-by: Jinqing Kuang --- source/dnode/vnode/src/tqCommon/tqCommon.c | 11 ----------- source/libs/sync/src/syncMain.c | 15 +++++++++++---- source/libs/sync/src/syncRequestVote.c | 1 + 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5ddbdc9f5c..d1a7831463 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1331,17 +1331,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (ret) { tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); } - -// STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; -// int32_t ret1 = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); -// if (ret1 == 0 && pTask != NULL) { -// SStreamTaskState s = streamTaskGetStatus(pTask); -// if (s.state == TASK_STATUS__STOP) { -// tqDebug("s-task:0x%x status:%s wait for it become init", req.taskId, s.name); -// streamMetaReleaseTask(pMeta, pTask); -// return TSDB_CODE_STREAM_TASK_IVLD_STATUS; -// } -// } } else { tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d", pMeta->vgId, req.taskId, req.checkpointId, req.transId); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0d112c5a23..de297ab4f7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2411,6 +2411,7 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm); return; } + sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId); bool voted = raftStoreHasVoted(pSyncNode); if (voted) { sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId); @@ -3568,7 +3569,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SRpcMsg rpcMsg = {0}; TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId)); - SyncTerm currentTerm = raftStoreGetTerm(ths); + SyncTerm currentTerm = raftStoreTryGetTerm(ths); SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; pMsgReply->destId = pMsg->srcId; @@ -3578,6 +3579,15 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; + // reply + TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64()); + trace = &(rpcMsg.info.traceId); + sGTrace("vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64, ths->vgId, + DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp); + + TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg)); + + if (currentTerm == 0) currentTerm = raftStoreGetTerm(ths); sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); @@ -3637,9 +3647,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } } - // reply - TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg)); - if (resetElect) syncNodeResetElectTimer(ths); return 0; } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index c887846915..88a97e55f3 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -108,6 +108,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncTerm currentTerm = raftStoreGetTerm(ths); if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR; + sTrace("vgId:%d, begin hasVoted", ths->vgId); bool grant = (pMsg->term == currentTerm) && logOK && ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId))); if (grant) {