enh: tidy up logging msg in syncNodeOnSnapshotRsp

This commit is contained in:
Benguang Zhao 2023-10-24 15:47:47 +08:00
parent 1c60e67a83
commit 811f1bbbea
1 changed files with 13 additions and 17 deletions

View File

@ -195,8 +195,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
goto _out; goto _out;
} }
sSInfo(pSender, "snapshot sender started. signature:(%" PRId64 ", %" PRId64 "), to dnode:%d", pSender->term, sSInfo(pSender, "snapshot sender start to dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", DID(&pMsg->destId),
pSender->startTime, DID(&pMsg->destId)); pSender->term, pSender->startTime);
code = 0; code = 0;
_out: _out:
@ -233,8 +233,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
syncSnapBufferReset(pSender->pSndBuf); syncSnapBufferReset(pSender->pSndBuf);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stopped. signature:(%" PRId64 ", %" PRId64 "), to dnode:%d", pSender->term, sSInfo(pSender, "snapshot sender stop to dnode:%d. signature:(%" PRId64 ", %" PRId64 "), finish:%d", DID(&destId),
pSender->startTime, DID(&destId)); pSender->term, pSender->startTime, finish);
} }
// when sender receive ack, call this function to send msg from seq // when sender receive ack, call this function to send msg from seq
@ -522,8 +522,8 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
ASSERT(pReceiver->startTime); ASSERT(pReceiver->startTime);
sRInfo(pReceiver, "snapshot receiver started. signature:(%" PRId64 ", %" PRId64 "), from dnode:%d", pReceiver->term, sRInfo(pReceiver, "snapshot receiver start from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->startTime, DID(&pReceiver->fromId)); DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime);
} }
// just set start = false // just set start = false
@ -547,8 +547,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
syncSnapBufferReset(pReceiver->pRcvBuf); syncSnapBufferReset(pReceiver->pRcvBuf);
sRInfo(pReceiver, "snapshot receiver stopped. signature:(%" PRId64 ", %" PRId64 "), from dnode:%d", pReceiver->term, sRInfo(pReceiver, "snapshot receiver stop from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->startTime, DID(&pReceiver->fromId)); DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime);
} }
// when recv last snapshot block, apply data into snapshot // when recv last snapshot block, apply data into snapshot
@ -1266,7 +1266,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
sSError(pSender, "snapshot sender not leader"); sSError(pSender, "snapshot sender not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
goto _ERROR; goto _ERROR;
@ -1274,15 +1273,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
if (pMsg->term != currentTerm) { if (pMsg->term != currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
currentTerm); currentTerm);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _ERROR; goto _ERROR;
} }
if (pMsg->code != 0) { if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code); sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
terrno = pMsg->code; terrno = pMsg->code;
goto _ERROR; goto _ERROR;
@ -1290,12 +1287,10 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// prepare <begin, end>, send begin msg // prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
} }
if (pSender->pReader == NULL || pSender->finish) { if (pSender->pReader == NULL || pSender->finish) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code, sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
pSender->pReader, pSender->finish); pSender->pReader, pSender->finish);
terrno = pMsg->code; terrno = pMsg->code;
@ -1303,7 +1298,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
} }
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); sSInfo(pSender, "process seq begin");
if (snapshotSend(pSender) != 0) { if (snapshotSend(pSender) != 0) {
goto _ERROR; goto _ERROR;
} }
@ -1312,7 +1307,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// receive ack is finish, close sender // receive ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); sSInfo(pSender, "process seq end");
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, true);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId); syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return 0; return 0;
@ -1320,12 +1315,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
// send next msg // send next msg
if (syncSnapBufferSend(pSender, ppMsg) != 0) { if (syncSnapBufferSend(pSender, ppMsg) != 0) {
sSError(pSender, "failed to send snapshot msg since %s. seq:%d", terrstr(), pSender->seq);
goto _ERROR; goto _ERROR;
} }
return 0; return 0;
_ERROR: _ERROR:
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, false);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId); syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return -1; return -1;
} }