From 3e13cd82808653f02248dda4e5f3ce86d9388ef5 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 16 Nov 2022 14:26:18 +0800 Subject: [PATCH] fix: check if timer triggered ahead of time in syncNodeElect --- include/libs/sync/syncTools.h | 2 + include/util/tdef.h | 2 +- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncAppendEntriesReply.c | 6 +-- source/libs/sync/src/syncElection.c | 13 +++++- source/libs/sync/src/syncMain.c | 41 +++++++++++++++---- source/libs/sync/src/syncReplication.c | 7 ++++ 7 files changed, 58 insertions(+), 14 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index d5c015bfb2..d009ea0b4f 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -696,6 +696,8 @@ int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); +void syncNodePegLastMsgRecvTime(SSyncNode* ths); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/include/util/tdef.h b/include/util/tdef.h index c5776e8d87..742b6b1da9 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -281,7 +281,7 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_MAX_REPLICA 5 -#define TSDB_SYNC_LOG_BUFFER_SIZE 512 +#define TSDB_SYNC_LOG_BUFFER_SIZE 1024 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5b5f198436..b341587bbe 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -298,6 +298,7 @@ typedef struct SSyncNode { int64_t startTime; int64_t leaderTime; int64_t lastReplicateTime; + int64_t lastMsgRecvTime; } SSyncNode; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index eddd0d51c9..cbff83acf8 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -148,7 +148,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn *pBarrier = syncLogIsReplicationBarrier(pEntry); prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); - if (prevLogTerm < 0 && terrno != TSDB_CODE_SUCCESS) { + if (prevLogTerm < 0) { sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index); goto _out; } @@ -163,8 +163,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn (void)syncNodeSendAppendEntries(pNode, pDestId, pMsgOut); ret = 0; - sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, - pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); + sDebug("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, + pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); _out: syncAppendEntriesDestroy(pMsgOut); diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index a287727a39..8d904ff934 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -34,6 +34,13 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "begin election"); + int64_t nowMs = taosGetMonoTimestampMs(); + if (nowMs < pSyncNode->lastMsgRecvTime + pSyncNode->electTimerMS) { + sError("vgId:%d, election timer triggered ahead of time for %" PRId64 "ms", pSyncNode->vgId, + pSyncNode->lastMsgRecvTime + pSyncNode->electTimerMS - nowMs); + return -1; + } + int32_t ret = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { syncNodeFollower2Candidate(pSyncNode); @@ -105,7 +112,11 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { int32_t ret = 0; - syncLogSendRequestVote(pSyncNode, pMsg, ""); + // syncLogSendRequestVote(pSyncNode, pMsg, ""); + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sInfo("vgId:%d, send request vote of term: %" PRId64 " to %s:%d", pSyncNode->vgId, pMsg->term, host, port); SRpcMsg rpcMsg; syncRequestVote2RpcMsg(pMsg, &rpcMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1e60b4950a..7811d3ca59 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1167,15 +1167,18 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; ASSERT(pMgr->restored == false); + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); if (pMgr->endIndex == 0) { ASSERT(pMgr->startIndex == 0); ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -1189,9 +1192,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod if (pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -1278,11 +1281,11 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode } SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 - ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 - ")", - pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sDebug("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 + ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + ")", + pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -1963,6 +1966,8 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t ret = 0; int32_t electMS; + syncNodePegLastMsgRecvTime(pSyncNode); + if (pSyncNode->pRaftCfg->isStandBy) { electMS = TIMER_MAX_MS; } else { @@ -3231,6 +3236,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); +#if 0 // reset timer ms if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); @@ -3239,6 +3245,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } else { sError("sync env is stop, syncNodeEqElectTimer"); } +#endif } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { @@ -3246,6 +3253,10 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { syncNodeEventLog(pSyncNode, "eq hb timer"); +#if 0 + sInfo("vgId:%d, heartbeat timer tick.", pSyncNode->vgId); +#endif + if (pSyncNode->replicaNum > 1) { if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { @@ -3484,6 +3495,11 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } +void syncNodePegLastMsgRecvTime(SSyncNode* ths) { + int64_t nowMs = taosGetMonoTimestampMs(); + ths->lastMsgRecvTime = nowMs; +} + int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { syncLogRecvHeartbeat(ths, pMsg, ""); @@ -3497,6 +3513,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); +#if 0 + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sInfo("vgId:%d, recv heartbeat msg from %s:%d", ths->vgId, host, port); +#endif + #if 1 if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { syncNodeStepDown(ths, pMsg->term); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 1c4b997875..0c6290180f 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -239,6 +239,13 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, c int32_t ret = 0; syncLogSendHeartbeat(pSyncNode, pMsg, ""); +#if 0 + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sInfo("vgId:%d, send heartbeat msg to %s:%d", pSyncNode->vgId, host, port); +#endif + SRpcMsg rpcMsg; syncHeartbeat2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg);