enh: check snapshot receiver and sender by signature

This commit is contained in:
Benguang Zhao 2023-09-20 14:27:05 +08:00
parent eb4e2aa58f
commit e901adfdf3
1 changed files with 50 additions and 29 deletions

View File

@ -79,8 +79,6 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
if (started) return 0;
taosMsleep(1);
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->pReader = NULL;
@ -394,6 +392,14 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
}
static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
if (pReceiver->term < pMsg->term) return -1;
if (pReceiver->term > pMsg->term) return 1;
if (pReceiver->startTime < pMsg->startTime) return -1;
if (pReceiver->startTime > pMsg->startTime) return 1;
return 0;
}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
@ -434,7 +440,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
if (started) return;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode);
pReceiver->term = pPreMsg->term;
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
ASSERT(pReceiver->startTime);
@ -585,18 +591,25 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM
if (snapshotReceiverIsStart(pReceiver)) {
// already start
if (pMsg->startTime > pReceiver->startTime) {
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
pReceiver->startTime, pMsg->startTime);
int32_t order = 0;
if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
sRInfo(pReceiver,
"received a new snapshot preparation. restart receiver"
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime);
goto _START_RECEIVER;
} else if (pMsg->startTime == pReceiver->startTime) {
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
pReceiver->startTime, pMsg->startTime);
} else if (order == 0) {
sRInfo(pReceiver,
"received a duplicate snapshot preparation. send reply"
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime);
goto _SEND_REPLY;
} else {
// ignore
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
sRError(pReceiver,
"received a stale snapshot preparation. ignore"
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno;
goto _SEND_REPLY;
@ -703,10 +716,9 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
goto _SEND_REPLY;
}
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "snapshot receiver begin failed since %s", terrstr());
goto _SEND_REPLY;
}
@ -759,10 +771,9 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot receive failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "snapshot receive failed since %s.", terrstr());
code = terrno;
goto _SEND_REPLY;
}
@ -811,7 +822,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
if (pReceiver->startTime != pMsg->startTime) {
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
@ -880,13 +891,13 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
}
if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
}
@ -1015,6 +1026,14 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
return 0;
}
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
if (pSender->term < pMsg->term) return -1;
if (pSender->term > pMsg->term) return 1;
if (pSender->startTime < pMsg->startTime) return -1;
if (pSender->startTime > pMsg->startTime) return 1;
return 0;
}
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
@ -1045,19 +1064,21 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return -1;
}
if (pMsg->startTime < pSender->startTime) {
sSError(pSender, "ignore stale rsp received. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
pSender->startTime, pMsg->startTime);
terrno = pMsg->code;
// check signature
int32_t order = 0;
if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
sSError(pSender,
"received a stale snapshot rsp. ignore it"
"sender signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
pSender->term, pSender->startTime, pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
} else if (pMsg->startTime > pSender->startTime) {
sSError(pSender, "unexpected start time in msg. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
pSender->startTime, pMsg->startTime);
} else if (order < 0) {
sSError(pSender, "snapshot sender is stale. stop");
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _ERROR;
}
ASSERT(pMsg->startTime == pSender->startTime);
// state, term, seq/ack
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");