diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 666515078a..d8697da64b 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -82,10 +82,10 @@ static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, Sync } SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, bool* pBarrier); int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index f077306475..f2e240344f 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -52,6 +52,7 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpc int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot); +int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode); int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 32e424666b..ed38ea9559 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -81,10 +81,12 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // replicate log SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); - // ASSERT(pMgr != NULL); - if (pMgr != NULL) { - (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); + if (pMgr == NULL) { + sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); + return -1; } + ASSERT(pMgr != NULL); + (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); } return 0; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 656b32bd7b..199ef289f3 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -234,10 +234,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); if (index <= pBuf->commitIndex) { - sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); + sTrace("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); ret = 0; goto _out; } @@ -364,7 +364,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); // replicate on demand - (void)syncNodeReplicate(pNode); + (void)syncNodeReplicateWithoutLock(pNode); // persist if (syncLogStorePersist(pLogStore, pEntry) < 0) { @@ -393,7 +393,7 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn return 0; } - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncEntry2OriginalRpc(pEntry, &rpcMsg); SFsmCbMeta cbMeta = {0}; @@ -666,8 +666,7 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync return 0; } -int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - SSyncLogBuffer* pBuf = pNode->pLogBuf; +int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); } else { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index d4a046b7ae..2623eebc23 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -135,6 +135,14 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh } int32_t syncNodeReplicate(SSyncNode* pNode) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + taosThreadMutexLock(&pBuf->mutex); + int32_t ret = syncNodeReplicateWithoutLock(pNode); + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + +int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum == 1) { return -1; } @@ -143,7 +151,7 @@ int32_t syncNodeReplicate(SSyncNode* pNode) { continue; } SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - (void)syncLogBufferReplicateOnce(pMgr, pNode); + (void)syncLogReplMgrReplicateOnce(pMgr, pNode); } return 0; }