From e901adfdf3653c224ffc24da86bd4bafc560ebff Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 20 Sep 2023 14:27:05 +0800 Subject: [PATCH] enh: check snapshot receiver and sender by signature --- source/libs/sync/src/syncSnapshot.c | 79 ++++++++++++++++++----------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 383fda89b0..59b33d20cc 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -79,8 +79,6 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true); if (started) return 0; - taosMsleep(1); - pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->pReader = NULL; @@ -394,6 +392,14 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false); } +static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { + if (pReceiver->term < pMsg->term) return -1; + if (pReceiver->term > pMsg->term) return 1; + if (pReceiver->startTime < pMsg->startTime) return -1; + if (pReceiver->startTime > pMsg->startTime) return 1; + return 0; +} + static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { if (pReceiver->pWriter != NULL) { sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId); @@ -434,7 +440,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p if (started) return; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; - pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode); + pReceiver->term = pPreMsg->term; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; ASSERT(pReceiver->startTime); @@ -585,18 +591,25 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM if (snapshotReceiverIsStart(pReceiver)) { // already start - if (pMsg->startTime > pReceiver->startTime) { - sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver", - pReceiver->startTime, pMsg->startTime); + int32_t order = 0; + if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) { + sRInfo(pReceiver, + "received a new snapshot preparation. restart receiver" + "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", + pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); goto _START_RECEIVER; - } else if (pMsg->startTime == pReceiver->startTime) { - sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply", - pReceiver->startTime, pMsg->startTime); + } else if (order == 0) { + sRInfo(pReceiver, + "received a duplicate snapshot preparation. send reply" + "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", + pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); goto _SEND_REPLY; } else { // ignore - sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore", - pReceiver->startTime, pMsg->startTime); + sRError(pReceiver, + "received a stale snapshot preparation. ignore" + "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", + pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; code = terrno; goto _SEND_REPLY; @@ -703,10 +716,9 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p goto _SEND_REPLY; } - if (pReceiver->startTime != pMsg->startTime) { - sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, - pReceiver->startTime, pMsg->startTime); + if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + sRError(pReceiver, "snapshot receiver begin failed since %s", terrstr()); goto _SEND_REPLY; } @@ -759,10 +771,9 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend int64_t timeNow = taosGetTimestampMs(); int32_t code = 0; - if (pReceiver->startTime != pMsg->startTime) { - sRError(pReceiver, "snapshot receive failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, - pReceiver->startTime, pMsg->startTime); + if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + sRError(pReceiver, "snapshot receive failed since %s.", terrstr()); code = terrno; goto _SEND_REPLY; } @@ -811,7 +822,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs int64_t timeNow = taosGetTimestampMs(); int32_t code = 0; - if (pReceiver->startTime != pMsg->startTime) { + if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, pReceiver->startTime, pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; @@ -880,13 +891,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } if (pMsg->term < raftStoreGetTerm(pSyncNode)) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } @@ -1015,6 +1026,14 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend return 0; } +static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + if (pSender->term < pMsg->term) return -1; + if (pSender->term > pMsg->term) return 1; + if (pSender->startTime < pMsg->startTime) return -1; + if (pSender->startTime > pMsg->startTime) return 1; + return 0; +} + // sender on message // // condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender @@ -1045,19 +1064,21 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return -1; } - if (pMsg->startTime < pSender->startTime) { - sSError(pSender, "ignore stale rsp received. sender startTime:%" PRId64 ", msg startTime:%" PRId64, - pSender->startTime, pMsg->startTime); - terrno = pMsg->code; + // check signature + int32_t order = 0; + if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) { + sSError(pSender, + "received a stale snapshot rsp. ignore it" + "sender signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", + pSender->term, pSender->startTime, pMsg->term, pMsg->startTime); + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; - } else if (pMsg->startTime > pSender->startTime) { - sSError(pSender, "unexpected start time in msg. sender startTime:%" PRId64 ", msg startTime:%" PRId64, - pSender->startTime, pMsg->startTime); + } else if (order < 0) { + sSError(pSender, "snapshot sender is stale. stop"); + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _ERROR; } - ASSERT(pMsg->startTime == pSender->startTime); - // state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");