sync refactor
This commit is contained in:
parent
6a7f5c5fd2
commit
ef61d3ee94
|
@ -26,7 +26,32 @@ extern "C" {
|
|||
#include "syncInt.h"
|
||||
#include "taosdef.h"
|
||||
|
||||
// \* Leader i advances its commitIndex.
|
||||
// \* This is done as a separate step from handling AppendEntries responses,
|
||||
// \* in part to minimize atomic regions, and in part so that leaders of
|
||||
// \* single-server clusters are able to mark entries committed.
|
||||
// AdvanceCommitIndex(i) ==
|
||||
// /\ state[i] = Leader
|
||||
// /\ LET \* The set of servers that agree up through index.
|
||||
// Agree(index) == {i} \cup {k \in Server :
|
||||
// matchIndex[i][k] >= index}
|
||||
// \* The maximum indexes for which a quorum agrees
|
||||
// agreeIndexes == {index \in 1..Len(log[i]) :
|
||||
// Agree(index) \in Quorum}
|
||||
// \* New value for commitIndex'[i]
|
||||
// newCommitIndex ==
|
||||
// IF /\ agreeIndexes /= {}
|
||||
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
|
||||
// THEN
|
||||
// Max(agreeIndexes)
|
||||
// ELSE
|
||||
// commitIndex[i]
|
||||
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
|
||||
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||
//
|
||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
|
||||
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
|
||||
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -46,25 +46,61 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
|
||||
|
||||
// update commit index
|
||||
SyncIndex newCommitIndex = pSyncNode->commitIndex;
|
||||
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
|
||||
++index) {
|
||||
if (syncAgree(pSyncNode, index)) {
|
||||
newCommitIndex = index;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pSyncNode->pFsm != NULL) {
|
||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
SyncIndex endIndex = SYNC_INDEX_INVALID;
|
||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||
if (i != SYNC_INDEX_INVALID) {
|
||||
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
|
||||
assert(pEntry != NULL);
|
||||
if (newCommitIndex > pSyncNode->commitIndex) {
|
||||
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
|
||||
SyncIndex endIndex = newCommitIndex;
|
||||
pSyncNode->commitIndex = newCommitIndex;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
if (pSyncNode->pFsm != NULL) {
|
||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||
if (i != SYNC_INDEX_INVALID) {
|
||||
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
|
||||
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
|
||||
|
||||
// b for debug
|
||||
bool b = false;
|
||||
if (matchIndex >= index) {
|
||||
b = true;
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
|
||||
int agreeCount = 0;
|
||||
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
|
||||
++agreeCount;
|
||||
}
|
||||
if (agreeCount >= pSyncNode->quorum) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
Loading…
Reference in New Issue