fix: release duplicate msgs in syncSnapBufferRecv

This commit is contained in:
Benguang Zhao 2023-11-03 14:01:39 +08:00
parent 456c8d23f3
commit 2cf6b3c1e4
1 changed files with 5 additions and 1 deletions

View File

@ -865,6 +865,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
if (pMsg->seq > pRcvBuf->cursor) { if (pMsg->seq > pRcvBuf->cursor) {
if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
}
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg; pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL; ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
@ -1002,7 +1005,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
} }
if (pMsg->term < raftStoreGetTerm(pSyncNode)) { if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
pMsg->seq);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1; return -1;
} }