sync refactor

This commit is contained in:
Minghao Li 2022-03-18 17:23:48 +08:00
parent 59ae887e30
commit 21feed915f
2 changed files with 18 additions and 12 deletions

View File

@ -18,6 +18,7 @@
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
@ -50,7 +51,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
++index) {
if (syncAgree(pSyncNode, index)) {
bool agree = syncAgree(pSyncNode, index);
if (agree) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
assert(pEntry != NULL);
@ -97,14 +99,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
// b for debug
bool b = false;
if (matchIndex >= index) {
b = true;
// I am leader, I agree
if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
return true;
}
return b;
// follower agree
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
if (matchIndex >= index) {
return true;
}
return false;
}
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {

View File

@ -195,7 +195,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
assert(pSyncNode->pLogStore != NULL);
pSyncNode->commitIndex = 0;
pSyncNode->commitIndex = SYNC_INDEX_INVALID;
// init ping timer
pSyncNode->pPingTimer = NULL;
@ -774,9 +774,6 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
if (ths->state == TAOS_SYNC_STATE_LEADER) {
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// only myself, maybe commit
syncMaybeAdvanceCommitIndex(ths);
// start replicate right now!
syncNodeReplicate(ths);
@ -791,6 +788,9 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
}
rpcFreeCont(rpcMsg.pCont);
// only myself, maybe commit
syncMaybeAdvanceCommitIndex(ths);
} else {
// pre commit
SRpcMsg rpcMsg;