From 5f0d23426dac8b99d74aeda91997811c840b8fcd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 1 Jun 2022 13:35:23 +0800 Subject: [PATCH] refactor: snapshotReSend --- source/libs/sync/src/syncSnapshot.c | 82 +++++++++++++++-------------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 7825a7cd95..38d3dc4995 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -370,50 +370,52 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - // begin - snapshotReceiverStart(pReceiver); - pReceiver->ack = pMsg->seq; - needRsp = true; - - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - // end, finish FSM - pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); - pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); - snapshotReceiverStop(pReceiver); - pReceiver->ack = pMsg->seq; - needRsp = true; - - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { - pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); - snapshotReceiverStop(pReceiver); - needRsp = false; - - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { - // transfering - if (pMsg->seq == pReceiver->ack + 1) { - pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { + // begin + snapshotReceiverStart(pReceiver); pReceiver->ack = pMsg->seq; + needRsp = true; + + } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { + // end, finish FSM + pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); + snapshotReceiverStop(pReceiver); + pReceiver->ack = pMsg->seq; + needRsp = true; + + } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { + pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); + snapshotReceiverStop(pReceiver); + needRsp = false; + + } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { + // transfering + if (pMsg->seq == pReceiver->ack + 1) { + pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + pReceiver->ack = pMsg->seq; + } + needRsp = true; + + } else { + ASSERT(0); } - needRsp = true; - } else { - ASSERT(0); - } + if (needRsp) { + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->ack = pReceiver->ack; - if (needRsp) { - SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pReceiver->ack; - - SRpcMsg rpcMsg; - syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); - syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); - syncSnapshotRspDestroy(pRspMsg); + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncSnapshotRspDestroy(pRspMsg); + } } } return 0;