diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index f2e240344f..a55fd7ead3 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -51,7 +51,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg); int32_t syncNodeReplicate(SSyncNode* pSyncNode); -int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot); +int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId); int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode); int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9601cd6ab0..04e4859c5e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -301,7 +301,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeRelease(pSyncNode); return 0; } - logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex); + logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention); } _DEL_WAL: diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index faa44a626c..cc1a40a430 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -742,7 +742,8 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1); - if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { + if ((index + 1 < firstVer) || (term < 0) || + (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 8cdf821cff..1d94b288d3 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -48,6 +48,15 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + taosThreadMutexLock(&pBuf->mutex); + SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); + syncLogReplMgrReset(pMgr); + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + int32_t syncNodeReplicate(SSyncNode* pNode) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 381327d4d7..413d6adf05 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -992,8 +992,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); snapshotSenderStop(pSender, true); - SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); - syncLogReplMgrReset(pMgr); + syncNodeReplicateReset(pSyncNode, &pMsg->srcId); return 0; } @@ -1018,8 +1017,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack"); sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq); snapshotSenderStop(pSender, true); - SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); - syncLogReplMgrReset(pMgr); + syncNodeReplicateReset(pSyncNode, &pMsg->srcId); return -1; } @@ -1027,8 +1025,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { _ERROR: snapshotSenderStop(pSender, true); - SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId); - syncLogReplMgrReset(pMgr); - + syncNodeReplicateReset(pSyncNode, &pMsg->srcId); return -1; }