Merge pull request #18783 from taosdata/feature/3.0_mhli
refactor(sync): re send snapshot in timer-routine when response timeout
This commit is contained in:
commit
be227bd6f6
|
@ -47,6 +47,7 @@ extern "C" {
|
||||||
|
|
||||||
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
||||||
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
||||||
|
#define SYNC_SNAP_RESEND_MS 1000 * 60
|
||||||
|
|
||||||
#define SYNC_MAX_BATCH_SIZE 1
|
#define SYNC_MAX_BATCH_SIZE 1
|
||||||
#define SYNC_INDEX_BEGIN 0
|
#define SYNC_INDEX_BEGIN 0
|
||||||
|
|
|
@ -44,6 +44,7 @@ typedef struct SSyncSnapshotSender {
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
int64_t endTime;
|
int64_t endTime;
|
||||||
|
int64_t lastSendTime;
|
||||||
bool finish;
|
bool finish;
|
||||||
|
|
||||||
// init when create
|
// init when create
|
||||||
|
|
|
@ -103,6 +103,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
||||||
pSender->sendingMS = 0;
|
pSender->sendingMS = 0;
|
||||||
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||||
pSender->startTime = taosGetTimestampMs();
|
pSender->startTime = taosGetTimestampMs();
|
||||||
|
pSender->lastSendTime = pSender->startTime;
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
|
|
||||||
// build begin msg
|
// build begin msg
|
||||||
|
@ -201,6 +202,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
||||||
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
||||||
|
|
||||||
|
pSender->lastSendTime = taosGetTimestampMs();
|
||||||
|
|
||||||
// event log
|
// event log
|
||||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
sSTrace(pSender, "snapshot sender finish");
|
sSTrace(pSender, "snapshot sender finish");
|
||||||
|
@ -213,33 +216,36 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
// send snapshot data from cache
|
// send snapshot data from cache
|
||||||
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
// send current block data
|
// send current block data
|
||||||
|
|
||||||
|
// build msg
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
|
||||||
|
|
||||||
|
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||||
|
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||||
|
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
||||||
|
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||||
|
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||||
|
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||||
|
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||||
|
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||||
|
pMsg->lastConfig = pSender->lastConfig;
|
||||||
|
pMsg->seq = pSender->seq;
|
||||||
|
|
||||||
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
|
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
|
||||||
// build msg
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
|
|
||||||
|
|
||||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
|
||||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
|
||||||
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
|
||||||
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
|
||||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
|
||||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
|
||||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
|
||||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
|
||||||
pMsg->lastConfig = pSender->lastConfig;
|
|
||||||
pMsg->seq = pSender->seq;
|
|
||||||
|
|
||||||
// pMsg->privateTerm = pSender->privateTerm;
|
// pMsg->privateTerm = pSender->privateTerm;
|
||||||
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
|
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
|
||||||
|
|
||||||
// send msg
|
|
||||||
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
|
||||||
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
|
||||||
|
|
||||||
// event log
|
|
||||||
sSTrace(pSender, "snapshot sender resend");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send msg
|
||||||
|
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
|
||||||
|
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
|
||||||
|
|
||||||
|
pSender->lastSendTime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
// event log
|
||||||
|
sSTrace(pSender, "snapshot sender resend");
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "syncRaftLog.h"
|
#include "syncRaftLog.h"
|
||||||
#include "syncReplication.h"
|
#include "syncReplication.h"
|
||||||
#include "syncRespMgr.h"
|
#include "syncRespMgr.h"
|
||||||
|
#include "syncSnapshot.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
static void syncNodeCleanConfigIndex(SSyncNode* ths) {
|
static void syncNodeCleanConfigIndex(SSyncNode* ths) {
|
||||||
|
@ -70,6 +71,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t timeNow = taosGetTimestampMs();
|
int64_t timeNow = taosGetTimestampMs();
|
||||||
|
|
||||||
|
for (int i = 0; i < ths->peersNum; ++i) {
|
||||||
|
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
|
||||||
|
if (pSender != NULL) {
|
||||||
|
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start &&
|
||||||
|
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) {
|
||||||
|
snapshotReSend(pSender);
|
||||||
|
} else {
|
||||||
|
sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId,
|
||||||
|
ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
|
if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
|
||||||
// end timeout wal snapshot
|
// end timeout wal snapshot
|
||||||
if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS &&
|
if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS &&
|
||||||
|
|
|
@ -566,7 +566,7 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
sNTrace(pSyncNode,
|
||||||
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
|
"send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
|
||||||
", stime:%" PRId64 ", seq:%d}, %s",
|
", stime:%" PRId64 ", seq:%d}, %s",
|
||||||
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
|
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
|
||||||
}
|
}
|
||||||
|
@ -593,7 +593,7 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
sNTrace(pSyncNode,
|
||||||
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
|
"send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
|
||||||
", stime:%" PRId64 ", ack:%d}, %s",
|
", stime:%" PRId64 ", ack:%d}, %s",
|
||||||
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
|
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue