fix(sync): reset commit index by snapshot when open sync
This commit is contained in:
parent
11b473fe9e
commit
90e7d794f3
|
@ -999,7 +999,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
// init TLA+ log vars
|
// init TLA+ log vars
|
||||||
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
|
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
|
||||||
ASSERT(pSyncNode->pLogStore != NULL);
|
ASSERT(pSyncNode->pLogStore != NULL);
|
||||||
pSyncNode->commitIndex = SYNC_INDEX_INVALID;
|
|
||||||
|
SyncIndex commitIndex = SYNC_INDEX_INVALID;
|
||||||
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
|
SSnapshot snapshot = {0};
|
||||||
|
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
if (snapshot.lastApplyIndex > commitIndex) {
|
||||||
|
commitIndex = snapshot.lastApplyIndex;
|
||||||
|
syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pSyncNode->commitIndex = commitIndex;
|
||||||
|
|
||||||
// timer ms init
|
// timer ms init
|
||||||
pSyncNode->pingBaseLine = PING_TIMER_MS;
|
pSyncNode->pingBaseLine = PING_TIMER_MS;
|
||||||
|
|
|
@ -46,8 +46,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port,
|
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
|
||||||
pMsg->term, pMsg->voteGranted);
|
port, pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeEventLog(ths, logBuf);
|
syncNodeEventLog(ths, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
|
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
|
||||||
pMsg->term, pMsg->voteGranted);
|
pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -71,7 +71,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
|
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
|
||||||
pMsg->term, pMsg->voteGranted);
|
pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -88,7 +88,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
|
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
|
||||||
host, port, pMsg->term, pMsg->voteGranted);
|
host, port, pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -191,8 +191,8 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host, port,
|
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
|
||||||
pMsg->term, pMsg->voteGranted);
|
port, pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeEventLog(ths, logBuf);
|
syncNodeEventLog(ths, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
|
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
|
||||||
pMsg->term, pMsg->voteGranted);
|
pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -216,7 +216,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
|
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
|
||||||
pMsg->term, pMsg->voteGranted);
|
pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -233,7 +233,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
|
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
|
||||||
host, port, pMsg->term, pMsg->voteGranted);
|
host, port, pMsg->term, pMsg->voteGranted);
|
||||||
syncNodeErrorLog(ths, logBuf);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue