enh: rename syncNodeOnSnapshotPre to syncNodeOnSnapshotPrep

This commit is contained in:
Benguang Zhao 2023-02-02 19:34:18 +08:00
parent 043fc8d980
commit b6b0b3439e
2 changed files with 14 additions and 14 deletions

View File

@ -24,7 +24,7 @@ extern "C" {
#define SYNC_SNAPSHOT_SEQ_INVALID -2 #define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1 #define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF

View File

@ -112,7 +112,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig; pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime; pMsg->startTime = pSender->startTime;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
// event log // event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start"); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
@ -379,7 +379,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
} }
pReceiver->start = true; pReceiver->start = true;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm; pReceiver->term = pReceiver->pSyncNode->raftStore.currentTerm;
pReceiver->fromId = pPreMsg->srcId; pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
@ -527,7 +527,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
return snapStart; return snapStart;
} }
static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
int32_t code = 0; int32_t code = 0;
@ -565,7 +565,7 @@ _START_RECEIVER:
} else { } else {
// waiting for clock match // waiting for clock match
while (timeNow < pMsg->startTime) { while (timeNow < pMsg->startTime) {
sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow,
pMsg->startTime); pMsg->startTime);
taosMsleep(10); taosMsleep(10);
timeNow = taosGetTimestampMs(); timeNow = taosGetTimestampMs();
@ -765,7 +765,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// receiver on message // receiver on message
// //
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT // condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT
// if receiver already start // if receiver already start
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) // if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
// if sender.start-time = receiver.start-time, maybe duplicate msg // if sender.start-time = receiver.start-time, maybe duplicate msg
@ -809,9 +809,9 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
int32_t code = 0; int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->raftStore.currentTerm) { if (pMsg->term == pSyncNode->raftStore.currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
code = syncNodeOnSnapshotPre(pSyncNode, pMsg); code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
@ -848,7 +848,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return code; return code;
} }
static int32_t syncNodeOnSnapshotPreRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
@ -945,8 +945,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->startTime != pSender->startTime) { if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, code:0x%x", pMsg->startTime, sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime,
pSender->startTime, pMsg->code); pSender->startTime, tstrerror(pMsg->code), pMsg->code);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR; goto _ERROR;
} }
@ -961,15 +961,15 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pMsg->code != 0) { if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error code:0x%x and stop sender", pMsg->code); sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
terrno = pMsg->code; terrno = pMsg->code;
goto _ERROR; goto _ERROR;
} }
// prepare <begin, end>, send begin msg // prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg); return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
} }
if (pSender->pReader == NULL || pSender->finish) { if (pSender->pReader == NULL || pSender->finish) {