sync refactor

This commit is contained in:
Minghao Li 2022-03-17 16:33:10 +08:00
parent 13aa59409d
commit e5b0e1465d
2 changed files with 30 additions and 6 deletions

View File

@ -15,6 +15,7 @@
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftLog.h"
// \* Leader i advances its commitIndex. // \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses, // \* This is done as a separate step from handling AppendEntries responses,
@ -42,4 +43,25 @@
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
// update commit index
if (pSyncNode->pFsm != NULL) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
SyncIndex endIndex = SYNC_INDEX_INVALID;
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
} }

View File

@ -749,9 +749,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
assert(ths->pFsm != NULL); if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
@ -760,9 +761,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
assert(ths->pFsm != NULL); if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1);
}
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} }