refactor(sync): pre snapshot
This commit is contained in:
parent
352c2d8ec0
commit
58fd2228d9
|
@ -38,7 +38,7 @@ extern "C" {
|
||||||
#define SYNC_MNODE_LOG_RETENTION 10000
|
#define SYNC_MNODE_LOG_RETENTION 10000
|
||||||
#define SYNC_VNODE_LOG_RETENTION 100
|
#define SYNC_VNODE_LOG_RETENTION 100
|
||||||
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
|
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
|
||||||
#define SNAPSHOT_WAIT_MS 1000 * 60
|
#define SNAPSHOT_WAIT_MS 1000 * 30
|
||||||
|
|
||||||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender {
|
||||||
int64_t sendingMS;
|
int64_t sendingMS;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
|
int64_t endTime;
|
||||||
bool finish;
|
bool finish;
|
||||||
|
|
||||||
// init when create
|
// init when create
|
||||||
|
|
|
@ -167,7 +167,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
if (pMsg->prevLogIndex >= startIndex) {
|
if (pMsg->prevLogIndex >= startIndex) {
|
||||||
SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1);
|
SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1);
|
||||||
ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
|
// ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
|
||||||
|
if (myPreLogTerm == SYNC_TERM_INVALID) {
|
||||||
|
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid");
|
||||||
|
goto _SEND_RESPONSE;
|
||||||
|
}
|
||||||
|
|
||||||
if (myPreLogTerm != pMsg->prevLogTerm) {
|
if (myPreLogTerm != pMsg->prevLogTerm) {
|
||||||
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match");
|
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match");
|
||||||
|
|
|
@ -45,6 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
pSender->replicaIndex = replicaIndex;
|
pSender->replicaIndex = replicaIndex;
|
||||||
pSender->term = pSyncNode->pRaftStore->currentTerm;
|
pSender->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
pSender->startTime = 0;
|
pSender->startTime = 0;
|
||||||
|
pSender->endTime = 0;
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -132,6 +133,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
// update flag
|
// update flag
|
||||||
pSender->start = false;
|
pSender->start = false;
|
||||||
pSender->finish = finish;
|
pSender->finish = finish;
|
||||||
|
pSender->endTime = taosGetTimestampMs();
|
||||||
|
|
||||||
// close reader
|
// close reader
|
||||||
if (pSender->pReader != NULL) {
|
if (pSender->pReader != NULL) {
|
||||||
|
@ -265,7 +267,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!snapshotSenderIsStart(pSender) && pSender->finish &&
|
if (!snapshotSenderIsStart(pSender) && pSender->finish &&
|
||||||
taosGetTimestampMs() - pSender->startTime < SNAPSHOT_WAIT_MS) {
|
taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
|
||||||
sNTrace(pSyncNode, "snapshot sender too frequently, ignore");
|
sNTrace(pSyncNode, "snapshot sender too frequently, ignore");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -794,6 +796,9 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update next index
|
||||||
|
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1);
|
||||||
|
|
||||||
// update seq
|
// update seq
|
||||||
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
|
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue