Merge pull request #23535 from taosdata/FIX/TD-26596-3.0
fix: release duplicate msgs in syncSnapBufferRecv
This commit is contained in:
commit
fd4abeb204
|
@ -865,6 +865,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
|
||||||
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
|
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
|
||||||
|
|
||||||
if (pMsg->seq > pRcvBuf->cursor) {
|
if (pMsg->seq > pRcvBuf->cursor) {
|
||||||
|
if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
|
||||||
|
pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
|
||||||
|
}
|
||||||
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
||||||
ppMsg[0] = NULL;
|
ppMsg[0] = NULL;
|
||||||
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
||||||
|
@ -1002,7 +1005,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
|
if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
|
||||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
|
sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
|
||||||
|
pMsg->seq);
|
||||||
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -268,7 +268,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
||||||
|
|
||||||
taosPrintLog(flags, level, dflag,
|
taosPrintLog(flags, level, dflag,
|
||||||
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
|
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
|
||||||
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
|
||||||
", seq:%d, ack:%d, "
|
", seq:%d, ack:%d, "
|
||||||
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
|
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
|
||||||
"), finish:%d, as:%d, to-dnode:%d}"
|
"), finish:%d, as:%d, to-dnode:%d}"
|
||||||
|
|
Loading…
Reference in New Issue