diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9b6593e4b5..872785abe5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -78,6 +78,8 @@ typedef struct SFsmCbMeta { int32_t code; ESyncState state; uint64_t seqNum; + SyncTerm term; + SyncTerm currentTerm; } SFsmCbMeta; typedef struct SSyncFSM { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0b7ed04819..5eec46056b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -139,14 +139,32 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; - pApplyMsg->info.node = pFsm->data; - mndProcessApplyMsg(pApplyMsg); - sdbUpdateVer(pMnode->pSdb, 1); + if (cbMeta.term < cbMeta.currentTerm) { + // restoring + + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + // mndTransPullup(pMnode); + + } else if (cbMeta.term == cbMeta.currentTerm) { + // restore finish + + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); + } + } else { + ASSERT(0); + } - if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); - } } } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1a5d418e75..bf1eeb4186 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -332,6 +332,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.code = 0; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0f17cf267e..01e408e543 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -110,6 +110,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); } @@ -162,4 +164,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } } return false; -} \ No newline at end of file +}