fix: [TD-34000] remove lock when send heartbeat reply main (#30123)

* fix(stream): reduce the consensus checkpoint id trans.

* refactor(stream): add some logs.

* refactor(stream): set the max checkpoint exec time 30min.

* refactor(stream): add checkpoint-consensus trans conflict check.

* refactor(stream): remove unused local variables.

* fix(stream): fix syntax error.

* fix(stream): 1. fix free memory error 2. continue if put result into dst hashmap failed.

* fix issue

* fix issue

* fix(mnd): follower mnode not processes the timer event.

* fix(stream): print correct error msg.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): add some logs.

* fix(stream): truncate long subtable name

* fix(stream): add buffer len.

* refactor(stream): update some logs.

* fix issue

* refactor(stream): update some logs.

* refactor(stream): update some logs.

* fix(stream): check return value.

* fix(stream): fix syntax error.

* fix(stream): check return value.

* fix(stream): update the timer check in mnode.

* fix: TD-34000-remove-lock-when-send-heartbeat-reply

* fix: TD-34000-remove-lock-when-send-heartbeat-reply

---------

Co-authored-by: Haojun Liao <hjliao@taosdata.com>
Co-authored-by: 54liuyao <54liuyao@163.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
This commit is contained in:
dongming chen 2025-03-17 17:13:54 +08:00 committed by GitHub
parent 40d32e16b0
commit 03fe3e5430
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 12 additions and 15 deletions

View File

@ -1331,17 +1331,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
if (ret) {
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
}
// STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
// int32_t ret1 = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
// if (ret1 == 0 && pTask != NULL) {
// SStreamTaskState s = streamTaskGetStatus(pTask);
// if (s.state == TASK_STATUS__STOP) {
// tqDebug("s-task:0x%x status:%s wait for it become init", req.taskId, s.name);
// streamMetaReleaseTask(pMeta, pTask);
// return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
// }
// }
} else {
tqDebug("vgId:%d task:0x%x stopped in follower node, not set the consensus checkpointId:%" PRId64 " transId:%d",
pMeta->vgId, req.taskId, req.checkpointId, req.transId);

View File

@ -2411,6 +2411,7 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
sError("vgId:%d, failed to vote for term, term:%" PRId64 ", storeTerm:%" PRId64, pSyncNode->vgId, term, storeTerm);
return;
}
sTrace("vgId:%d, begin hasVoted", pSyncNode->vgId);
bool voted = raftStoreHasVoted(pSyncNode);
if (voted) {
sError("vgId:%d, failed to vote for term since not voted", pSyncNode->vgId);
@ -3568,7 +3569,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SRpcMsg rpcMsg = {0};
TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
SyncTerm currentTerm = raftStoreGetTerm(ths);
SyncTerm currentTerm = raftStoreTryGetTerm(ths);
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
pMsgReply->destId = pMsg->srcId;
@ -3578,6 +3579,15 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->startTime = ths->startTime;
pMsgReply->timeStamp = tsMs;
// reply
TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64());
trace = &(rpcMsg.info.traceId);
sGTrace("vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64, ths->vgId,
DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp);
TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
if (currentTerm == 0) currentTerm = raftStoreGetTerm(ths);
sGTrace("vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64,
ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm);
@ -3637,9 +3647,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
}
// reply
TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg));
if (resetElect) syncNodeResetElectTimer(ths);
return 0;
}

View File

@ -108,6 +108,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncTerm currentTerm = raftStoreGetTerm(ths);
if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR;
sTrace("vgId:%d, begin hasVoted", ths->vgId);
bool grant = (pMsg->term == currentTerm) && logOK &&
((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
if (grant) {