diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 513ba8cb34..b8a4726be0 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -41,6 +41,7 @@ extern "C" { #define SNAPSHOT_WAIT_MS 1000 * 30 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 +#define SYNC_HEART_TIMEOUT_MS 1000 * 8 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 16fe6c1b91..ceb452c551 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -196,7 +196,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index e15d7ac3df..743beb7f82 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -464,7 +464,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 57f52c7d88..1e72b4eb26 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -232,6 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); +bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 92e7b555a4..acaf499cbb 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -137,6 +137,7 @@ typedef struct SyncHeartbeatReply { SyncTerm term; SyncTerm privateTerm; int64_t startTime; + int64_t timeStamp; } SyncHeartbeatReply; typedef struct SyncPreSnapshot { diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index ca5e531528..830e50fc4f 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -50,9 +50,10 @@ void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); // int64_t timeNow = taosGetMonotonicMs(); + int64_t timeNow = taosGetTimestampMs(); for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { pSyncIndexMgr->startTimeArr[i] = 0; - pSyncIndexMgr->recvTimeArr[i] = 0; + pSyncIndexMgr->recvTimeArr[i] = timeNow; } /* @@ -147,7 +148,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa return recvTime; } } - ASSERT(0); + return -1; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 88b8ba7e25..d9f6753dac 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -639,6 +639,14 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { return -1; } + // heartbeat timeout + if (syncNodeHeartbeatTimeout(pSyncNode)) { + terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64, + TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); + return -1; + } + // optimized one replica if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { SyncIndex retIndex; @@ -2086,6 +2094,29 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand return code; } +bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) { + if (pSyncNode->replicaNum == 1) { + return false; + } + + int32_t toCount = 0; + int64_t tsNow = taosGetTimestampMs(); + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); + if (recvTime == 0 || recvTime == -1) { + continue; + } + + if (tsNow - recvTime > SYNC_HEART_TIMEOUT_MS) { + toCount++; + } + } + + bool b = (toCount >= pSyncNode->quorum ? true : false); + + return b; +} + static int32_t syncNodeAppendNoop(SSyncNode* ths) { int32_t ret = 0; @@ -2127,6 +2158,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->srcId = ths->myRaftId; pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->privateTerm = 8864; // magic number + pMsgReply->timeStamp = taosGetTimestampMs(); if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncNodeResetElectTimer(ths); @@ -2191,7 +2223,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvHeartbeatReply(ths, pMsg, ""); // update last reply time, make decision whether the other node is alive or not - syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->destId, pMsg->startTime); + syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, pMsg->timeStamp); return 0; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index da6f8a52d9..4c84786286 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -424,8 +424,8 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); + sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->timeStamp, s); } void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {