refact: improve code of syncNodeOnSnapshot

This commit is contained in:
Benguang Zhao 2023-10-27 18:55:30 +08:00
parent 4163a3be7c
commit c3f9cae36b
1 changed files with 58 additions and 48 deletions

View File

@ -1019,6 +1019,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
SyncSnapshotSend *pMsg = ppMsg[0];
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int32_t code = 0;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
@ -1042,47 +1043,56 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
}
// state, term, seq/ack
int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
sRError(pReceiver, "snapshot receiver not a follower or learner");
return -1;
}
if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
return -1;
}
// prepare
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
sInfo("vgId:%d, receive prepare msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
pMsg->startTime);
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
sInfo("vgId:%d, receive begin msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
goto _out;
}
// begin
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
pMsg->startTime);
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
sInfo("vgId:%d, receive end msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId,
pMsg->term, pMsg->startTime);
goto _out;
}
// data
if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
goto _out;
}
// end
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
pMsg->startTime);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (code != 0) {
sRError(pReceiver, "failed to end snapshot.");
code = -1;
} else if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
code = -1;
}
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
} else {
// error log
sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver term not equal");
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver not follower");
code = -1;
goto _out;
}
code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
if (code != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
}
goto _out;
}
_out:;
syncNodeResetElectTimer(pSyncNode);
return code;
}
@ -1229,8 +1239,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// check signature
int32_t order = 0;
if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
sSError(pSender, "received a stale snapshot rsp, msg signature:(%" PRId64 ", %" PRId64 "), ignore it.", pMsg->term,
pMsg->startTime);
sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
} else if (order < 0) {
@ -1239,7 +1248,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
goto _ERROR;
}
// state, term, seq/ack
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
sSError(pSender, "snapshot sender not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER;
@ -1260,12 +1268,15 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
goto _ERROR;
}
// prepare <begin, end>, send begin msg
// send begin
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
sSInfo(pSender, "process prepare rsp");
if (syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg) != 0) {
goto _ERROR;
}
}
// send next msg
// send msg of data or end
if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
if (syncSnapBufferSend(pSender, ppMsg) != 0) {
sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq,
@ -1274,12 +1285,11 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
}
}
// receive ack is finish, close sender
// end
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
sSInfo(pSender, "process seq end");
sSInfo(pSender, "process end rsp");
snapshotSenderStop(pSender, true);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return 0;
}
return 0;