Merge pull request #20314 from taosdata/FIX/TD-22983-main
enh: keep extra raft Logs before minimum match index
This commit is contained in:
commit
72680094fc
|
@ -51,7 +51,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
|||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
||||
|
||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
|
||||
int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
|
||||
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
|
||||
|
||||
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
|
|
|
@ -301,7 +301,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
|||
syncNodeRelease(pSyncNode);
|
||||
return 0;
|
||||
}
|
||||
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex);
|
||||
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
|
||||
}
|
||||
|
||||
_DEL_WAL:
|
||||
|
|
|
@ -742,7 +742,8 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p
|
|||
|
||||
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
|
||||
term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1);
|
||||
if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
|
||||
if ((index + 1 < firstVer) || (term < 0) ||
|
||||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
|
||||
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
|
||||
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
|
||||
|
|
|
@ -48,6 +48,15 @@
|
|||
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
|
||||
int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
|
||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
|
||||
syncLogReplMgrReset(pMgr);
|
||||
taosThreadMutexUnlock(&pBuf->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeReplicate(SSyncNode* pNode) {
|
||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
|
|
|
@ -992,8 +992,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
|
||||
snapshotSenderStop(pSender, true);
|
||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
||||
syncLogReplMgrReset(pMgr);
|
||||
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1018,8 +1017,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
|
||||
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
||||
snapshotSenderStop(pSender, true);
|
||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
||||
syncLogReplMgrReset(pMgr);
|
||||
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1027,8 +1025,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
|
||||
_ERROR:
|
||||
snapshotSenderStop(pSender, true);
|
||||
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
|
||||
syncLogReplMgrReset(pMgr);
|
||||
|
||||
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
return -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue