update sync replication

This commit is contained in:
Minglei Jin 2024-07-25 12:09:51 +08:00
parent a5cd3444d1
commit 970a5f31a3
1 changed files with 10 additions and 6 deletions

View File

@ -52,7 +52,8 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
int32_t syncNodeReplicate(SSyncNode* pNode) { int32_t syncNodeReplicate(SSyncNode* pNode) {
@ -60,13 +61,14 @@ int32_t syncNodeReplicate(SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
int32_t ret = syncNodeReplicateWithoutLock(pNode); int32_t ret = syncNodeReplicateWithoutLock(pNode);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return ret;
TAOS_RETURN(ret);
} }
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) || if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) ||
pNode->raftCfg.cfg.totalReplicaNum == 1) { pNode->raftCfg.cfg.totalReplicaNum == 1) {
return -1; TAOS_RETURN(TSDB_CODE_FAILED);
} }
for (int32_t i = 0; i < pNode->totalReplicaNum; i++) { for (int32_t i = 0; i < pNode->totalReplicaNum; i++) {
if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) { if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
@ -75,14 +77,16 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogReplStart(pMgr, pNode); (void)syncLogReplStart(pMgr, pNode);
} }
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
pMsg->destId = *destRaftId; pMsg->destId = *destRaftId;
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
@ -112,5 +116,5 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} }
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }