diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index c75d23d96d..8f0b8e5a79 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -18,6 +18,7 @@ #include "syncInt.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncUtil.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -50,7 +51,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex newCommitIndex = pSyncNode->commitIndex; for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; ++index) { - if (syncAgree(pSyncNode, index)) { + bool agree = syncAgree(pSyncNode, index); + if (agree) { // term SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); assert(pEntry != NULL); @@ -97,14 +99,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { - SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId); - - // b for debug - bool b = false; - if (matchIndex >= index) { - b = true; + // I am leader, I agree + if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + return true; } - return b; + + // follower agree + SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId); + if (matchIndex >= index) { + return true; + } + + return false; } bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6c2ef0c85b..de108086bc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -195,7 +195,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); assert(pSyncNode->pLogStore != NULL); - pSyncNode->commitIndex = 0; + pSyncNode->commitIndex = SYNC_INDEX_INVALID; // init ping timer pSyncNode->pPingTimer = NULL; @@ -774,9 +774,6 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->state == TAOS_SYNC_STATE_LEADER) { ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - // only myself, maybe commit - syncMaybeAdvanceCommitIndex(ths); - // start replicate right now! syncNodeReplicate(ths); @@ -791,6 +788,9 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg } rpcFreeCont(rpcMsg.pCont); + // only myself, maybe commit + syncMaybeAdvanceCommitIndex(ths); + } else { // pre commit SRpcMsg rpcMsg;