Merge pull request #18403 from taosdata/feature/3.0_mhli
refactor(sync): can not propose when heartbeat timeout
This commit is contained in:
commit
dbf270a0c4
|
@ -41,6 +41,7 @@ extern "C" {
|
|||
#define SNAPSHOT_WAIT_MS 1000 * 30
|
||||
|
||||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
|
|
|
@ -196,7 +196,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -464,7 +464,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -232,6 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S
|
|||
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
|
||||
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
|
||||
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode);
|
||||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
|
|
|
@ -137,6 +137,7 @@ typedef struct SyncHeartbeatReply {
|
|||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
int64_t startTime;
|
||||
int64_t timeStamp;
|
||||
} SyncHeartbeatReply;
|
||||
|
||||
typedef struct SyncPreSnapshot {
|
||||
|
|
|
@ -50,9 +50,10 @@ void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
|
|||
memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
|
||||
|
||||
// int64_t timeNow = taosGetMonotonicMs();
|
||||
int64_t timeNow = taosGetTimestampMs();
|
||||
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
|
||||
pSyncIndexMgr->startTimeArr[i] = 0;
|
||||
pSyncIndexMgr->recvTimeArr[i] = 0;
|
||||
pSyncIndexMgr->recvTimeArr[i] = timeNow;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -147,7 +148,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa
|
|||
return recvTime;
|
||||
}
|
||||
}
|
||||
ASSERT(0);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -639,6 +639,14 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// heartbeat timeout
|
||||
if (syncNodeHeartbeatTimeout(pSyncNode)) {
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
|
||||
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// optimized one replica
|
||||
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
||||
SyncIndex retIndex;
|
||||
|
@ -2086,6 +2094,29 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
|
|||
return code;
|
||||
}
|
||||
|
||||
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->replicaNum == 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t toCount = 0;
|
||||
int64_t tsNow = taosGetTimestampMs();
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
||||
if (recvTime == 0 || recvTime == -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tsNow - recvTime > SYNC_HEART_TIMEOUT_MS) {
|
||||
toCount++;
|
||||
}
|
||||
}
|
||||
|
||||
bool b = (toCount >= pSyncNode->quorum ? true : false);
|
||||
|
||||
return b;
|
||||
}
|
||||
|
||||
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||
int32_t ret = 0;
|
||||
|
||||
|
@ -2127,6 +2158,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
pMsgReply->srcId = ths->myRaftId;
|
||||
pMsgReply->term = ths->pRaftStore->currentTerm;
|
||||
pMsgReply->privateTerm = 8864; // magic number
|
||||
pMsgReply->timeStamp = taosGetTimestampMs();
|
||||
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeResetElectTimer(ths);
|
||||
|
@ -2191,7 +2223,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncLogRecvHeartbeatReply(ths, pMsg, "");
|
||||
|
||||
// update last reply time, make decision whether the other node is alive or not
|
||||
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->destId, pMsg->startTime);
|
||||
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, pMsg->timeStamp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -424,8 +424,8 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
|||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
|
||||
pMsg->term, pMsg->privateTerm, s);
|
||||
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
|
||||
pMsg->term, pMsg->timeStamp, s);
|
||||
}
|
||||
|
||||
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
|
||||
|
|
Loading…
Reference in New Issue