From c3f9cae36bfa9fa8ce7df630834259999b14e3f6 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 27 Oct 2023 18:55:30 +0800 Subject: [PATCH] refact: improve code of syncNodeOnSnapshot --- source/libs/sync/src/syncSnapshot.c | 106 +++++++++++++++------------- 1 file changed, 58 insertions(+), 48 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 92d9571906..56735d479e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -1019,6 +1019,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; SyncSnapshotSend *pMsg = ppMsg[0]; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + int32_t code = 0; // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { @@ -1042,47 +1043,56 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term); } - // state, term, seq/ack - int32_t code = 0; - if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { - if (pMsg->term == raftStoreGetTerm(pSyncNode)) { - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) { - sInfo("vgId:%d, receive prepare msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", - pSyncNode->vgId, pMsg->term, pMsg->startTime); - code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - sInfo("vgId:%d, receive begin msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", - pSyncNode->vgId, pMsg->term, pMsg->startTime); - code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - sInfo("vgId:%d, receive end msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, - pMsg->term, pMsg->startTime); - code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); - if (code != 0) { - sRError(pReceiver, "failed to end snapshot."); - code = -1; - } else if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { - sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); - code = -1; - } - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { - code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); - } else { - // error log - sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack); - code = -1; - } - } else { - // error log - sRError(pReceiver, "snapshot receiver term not equal"); - code = -1; - } - } else { - // error log - sRError(pReceiver, "snapshot receiver not follower"); - code = -1; + if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) { + sRError(pReceiver, "snapshot receiver not a follower or learner"); + return -1; } + if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) { + sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq); + return -1; + } + + // prepare + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) { + sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, + pMsg->startTime); + code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); + goto _out; + } + + // begin + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { + sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, + pMsg->startTime); + code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); + goto _out; + } + + // data + if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { + code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); + goto _out; + } + + // end + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { + sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, + pMsg->startTime); + code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); + if (code != 0) { + sRError(pReceiver, "failed to end snapshot."); + goto _out; + } + + code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode); + if (code != 0) { + sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); + } + goto _out; + } + +_out:; syncNodeResetElectTimer(pSyncNode); return code; } @@ -1229,8 +1239,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // check signature int32_t order = 0; if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) { - sSError(pSender, "received a stale snapshot rsp, msg signature:(%" PRId64 ", %" PRId64 "), ignore it.", pMsg->term, - pMsg->startTime); + sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } else if (order < 0) { @@ -1239,7 +1248,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { goto _ERROR; } - // state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { sSError(pSender, "snapshot sender not leader"); terrno = TSDB_CODE_SYN_NOT_LEADER; @@ -1260,12 +1268,15 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { goto _ERROR; } - // prepare , send begin msg + // send begin if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) { - return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); + sSInfo(pSender, "process prepare rsp"); + if (syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg) != 0) { + goto _ERROR; + } } - // send next msg + // send msg of data or end if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) { if (syncSnapBufferSend(pSender, ppMsg) != 0) { sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq, @@ -1274,12 +1285,11 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { } } - // receive ack is finish, close sender + // end if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - sSInfo(pSender, "process seq end"); + sSInfo(pSender, "process end rsp"); snapshotSenderStop(pSender, true); syncNodeReplicateReset(pSyncNode, &pMsg->srcId); - return 0; } return 0;