enh(sync) sync/mnode integration, add term, currentTerm in cbMeta

This commit is contained in:
Minghao Li 2022-05-23 12:14:53 +08:00
parent 2b668e3b27
commit 97b1e95ad8
4 changed files with 32 additions and 8 deletions

View File

@ -78,6 +78,8 @@ typedef struct SFsmCbMeta {
int32_t code;
ESyncState state;
uint64_t seqNum;
SyncTerm term;
SyncTerm currentTerm;
} SFsmCbMeta;
typedef struct SSyncFSM {

View File

@ -139,14 +139,32 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
pApplyMsg->info.node = pFsm->data;
mndProcessApplyMsg(pApplyMsg);
sdbUpdateVer(pMnode->pSdb, 1);
if (cbMeta.term < cbMeta.currentTerm) {
// restoring
SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
pApplyMsg->info.node = pFsm->data;
mndProcessApplyMsg(pApplyMsg);
//sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term);
// mndTransPullup(pMnode);
} else if (cbMeta.term == cbMeta.currentTerm) {
// restore finish
SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
pApplyMsg->info.node = pFsm->data;
mndProcessApplyMsg(pApplyMsg);
//sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem);
}
} else {
ASSERT(0);
}
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem);
}
}
}

View File

@ -332,6 +332,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}

View File

@ -110,6 +110,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta.code = 0;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
}
@ -162,4 +164,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
}
}
return false;
}
}