fix(sync): snapshot begin index
This commit is contained in:
parent
a760fb8071
commit
6f29fd4020
|
@ -119,7 +119,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
taosRealPath(tdir, NULL, sizeof(tdir));
|
taosRealPath(tdir, NULL, sizeof(tdir));
|
||||||
|
|
||||||
// for test tsdb snapshot
|
// for test tsdb snapshot
|
||||||
#if 0
|
#if 1
|
||||||
pVnode->config.walCfg.segSize = 200;
|
pVnode->config.walCfg.segSize = 200;
|
||||||
pVnode->config.walCfg.retentionSize = 2000;
|
pVnode->config.walCfg.retentionSize = 2000;
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -223,6 +223,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
// snapshot --------------
|
// snapshot --------------
|
||||||
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
|
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
|
||||||
|
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
|
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
|
||||||
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
|
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
|
||||||
|
|
|
@ -711,6 +711,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
syncNodeEventLog(ths, logBuf);
|
syncNodeEventLog(ths, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
// maybe update commit index by snapshot
|
||||||
|
syncNodeMaybeUpdateCommitBySnapshot(ths);
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
pReply->srcId = ths->myRaftId;
|
pReply->srcId = ths->myRaftId;
|
||||||
|
@ -718,7 +721,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
pReply->term = ths->pRaftStore->currentTerm;
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
pReply->success = false;
|
pReply->success = false;
|
||||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
pReply->matchIndex = ths->commitIndex;
|
||||||
|
|
||||||
// msg event log
|
// msg event log
|
||||||
do {
|
do {
|
||||||
|
|
|
@ -147,6 +147,15 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
|
||||||
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
// print log
|
||||||
|
do {
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
|
||||||
|
pMsg->matchIndex, pMsg->success);
|
||||||
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
|
||||||
|
} while (0);
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
||||||
|
@ -238,7 +247,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
||||||
SSnapshot oldSnapshot;
|
SSnapshot oldSnapshot;
|
||||||
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
|
||||||
SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm;
|
SyncTerm newSnapshotTerm = oldSnapshot.lastApplyTerm;
|
||||||
syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, newSnapshotTerm, pMsg);
|
syncNodeStartSnapshotOnce(ths, pMsg->matchIndex + 1, nextIndex, newSnapshotTerm, pMsg);
|
||||||
|
|
||||||
// get sender
|
// get sender
|
||||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
|
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
|
||||||
|
@ -256,6 +265,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
||||||
}
|
}
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
||||||
|
|
||||||
|
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
|
if (pMsg->matchIndex > oldMatchIndex) {
|
||||||
|
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
|
||||||
|
}
|
||||||
|
|
||||||
// event log, update next-index
|
// event log, update next-index
|
||||||
do {
|
do {
|
||||||
char host[64];
|
char host[64];
|
||||||
|
|
|
@ -1083,6 +1083,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
return pSyncNode;
|
return pSyncNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
|
||||||
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
|
SSnapshot snapshot;
|
||||||
|
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
|
||||||
|
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void syncNodeStart(SSyncNode* pSyncNode) {
|
void syncNodeStart(SSyncNode* pSyncNode) {
|
||||||
// start raft
|
// start raft
|
||||||
if (pSyncNode->replicaNum == 1) {
|
if (pSyncNode->replicaNum == 1) {
|
||||||
|
|
|
@ -170,84 +170,3 @@ if $rows != 100 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
|
||||||
########################################################
|
|
||||||
|
|
||||||
|
|
||||||
########################################################
|
|
||||||
print ===> start dnode1 dnode3 dnode4
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
|
||||||
#system sh/exec.sh -n dnode2 -s start
|
|
||||||
system sh/exec.sh -n dnode3 -s start
|
|
||||||
system sh/exec.sh -n dnode4 -s start
|
|
||||||
|
|
||||||
sleep 7000
|
|
||||||
|
|
||||||
print =============== query data
|
|
||||||
sql connect
|
|
||||||
sql use db
|
|
||||||
sql select * from ct1
|
|
||||||
print rows: $rows
|
|
||||||
print $data00 $data01 $data02
|
|
||||||
if $rows != 100 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
||||||
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
|
||||||
########################################################
|
|
||||||
|
|
||||||
|
|
||||||
########################################################
|
|
||||||
print ===> start dnode1 dnode2 dnode4
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
|
||||||
system sh/exec.sh -n dnode2 -s start
|
|
||||||
#system sh/exec.sh -n dnode3 -s start
|
|
||||||
system sh/exec.sh -n dnode4 -s start
|
|
||||||
|
|
||||||
sleep 3000
|
|
||||||
|
|
||||||
print =============== query data
|
|
||||||
sql select * from ct1
|
|
||||||
print rows: $rows
|
|
||||||
print $data00 $data01 $data02
|
|
||||||
if $rows != 100 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
|
||||||
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
|
||||||
########################################################
|
|
||||||
|
|
||||||
|
|
||||||
########################################################
|
|
||||||
print ===> start dnode1 dnode2 dnode3
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
|
||||||
system sh/exec.sh -n dnode2 -s start
|
|
||||||
system sh/exec.sh -n dnode3 -s start
|
|
||||||
#system sh/exec.sh -n dnode4 -s start
|
|
||||||
|
|
||||||
sleep 3000
|
|
||||||
|
|
||||||
print =============== query data
|
|
||||||
sql select * from ct1
|
|
||||||
print rows: $rows
|
|
||||||
print $data00 $data01 $data02
|
|
||||||
if $rows != 100 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
|
||||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
||||||
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
|
||||||
########################################################
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue