diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e4df93ca47..9922357134 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // reject request if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { + sTrace( + "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; @@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { + sTrace( + "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + syncNodeBecomeFollower(ths); // ret or reply? @@ -159,6 +169,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntryDestory(pPreEntry); } + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d, preMatch:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch); + if (preMatch) { // must has preIndex in local log assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); @@ -167,7 +182,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { bool hasAppendEntries = pMsg->dataLen > 0; if (hasExtraEntries && hasAppendEntries) { - // conflict + // not conflict by default bool conflict = false; SyncIndex extraIndex = pMsg->prevLogIndex + 1; @@ -177,8 +192,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); assert(pAppendEntry != NULL); + // log not match, conflict assert(extraIndex == pAppendEntry->index); - if (pExtraEntry->term == pAppendEntry->term) { + if (pExtraEntry->term != pAppendEntry->term) { conflict = true; } @@ -187,6 +203,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); SyncIndex delEnd = extraIndex; + sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); + // notice! reverse roll back! for (SyncIndex index = delEnd; index >= delBegin; --index) { if (ths->pFsm->FpRollBackCb != NULL) { @@ -212,7 +230,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2); } } rpcFreeCont(rpcMsg.pCont); @@ -237,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3); } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 98cf025093..a896ac5ff4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -863,7 +863,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1); } } rpcFreeCont(rpcMsg.pCont);