fix: protect syncNodeReplicate with mutex lock

This commit is contained in:
Benguang Zhao 2022-11-22 10:54:32 +08:00
parent b90ee79623
commit 8ef5ca78ee
5 changed files with 23 additions and 13 deletions

View File

@ -82,10 +82,10 @@ static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, Sync
}
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier);
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);

View File

@ -52,6 +52,7 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpc
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);

View File

@ -81,10 +81,12 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// replicate log
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
// ASSERT(pMgr != NULL);
if (pMgr != NULL) {
(void)syncLogReplMgrProcessReply(pMgr, ths, pMsg);
if (pMgr == NULL) {
sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr);
return -1;
}
ASSERT(pMgr != NULL);
(void)syncLogReplMgrProcessReply(pMgr, ths, pMsg);
}
return 0;
}

View File

@ -234,10 +234,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
if (index <= pBuf->commitIndex) {
sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
sTrace("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
ret = 0;
goto _out;
}
@ -364,7 +364,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
// replicate on demand
(void)syncNodeReplicate(pNode);
(void)syncNodeReplicateWithoutLock(pNode);
// persist
if (syncLogStorePersist(pLogStore, pEntry) < 0) {
@ -393,7 +393,7 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
return 0;
}
SRpcMsg rpcMsg;
SRpcMsg rpcMsg = {0};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0};
@ -666,8 +666,7 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
return 0;
}
int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->restored) {
(void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
} else {

View File

@ -135,6 +135,14 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
}
int32_t syncNodeReplicate(SSyncNode* pNode) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
int32_t ret = syncNodeReplicateWithoutLock(pNode);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
}
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum == 1) {
return -1;
}
@ -143,7 +151,7 @@ int32_t syncNodeReplicate(SSyncNode* pNode) {
continue;
}
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogBufferReplicateOnce(pMgr, pNode);
(void)syncLogReplMgrReplicateOnce(pMgr, pNode);
}
return 0;
}