enh: use syncNodeReplicateReset in syncNodeOnSnapshotRsp
This commit is contained in:
parent
383459441d
commit
21c18a4a9b
|
@ -51,7 +51,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
|
||||||
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
|
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
|
||||||
|
|
||||||
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||||
|
|
|
@ -48,6 +48,15 @@
|
||||||
|
|
||||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||||
|
|
||||||
|
int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
|
||||||
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
|
||||||
|
syncLogReplMgrReset(pMgr);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pNode) {
|
int32_t syncNodeReplicate(SSyncNode* pNode) {
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
|
|
@ -992,8 +992,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
|
||||||
snapshotSenderStop(pSender, true);
|
snapshotSenderStop(pSender, true);
|
||||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||||
syncLogReplMgrReset(pMgr);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1018,8 +1017,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
|
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
|
||||||
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
||||||
snapshotSenderStop(pSender, true);
|
snapshotSenderStop(pSender, true);
|
||||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||||
syncLogReplMgrReset(pMgr);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1027,8 +1025,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
_ERROR:
|
_ERROR:
|
||||||
snapshotSenderStop(pSender, true);
|
snapshotSenderStop(pSender, true);
|
||||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||||
syncLogReplMgrReset(pMgr);
|
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue