diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 7e556b28f5..195354ac79 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -195,8 +195,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { goto _out; } - sSInfo(pSender, "snapshot sender started. signature:(%" PRId64 ", %" PRId64 "), to dnode:%d", pSender->term, - pSender->startTime, DID(&pMsg->destId)); + sSInfo(pSender, "snapshot sender start to dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", DID(&pMsg->destId), + pSender->term, pSender->startTime); code = 0; _out: @@ -233,8 +233,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { syncSnapBufferReset(pSender->pSndBuf); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - sSInfo(pSender, "snapshot sender stopped. signature:(%" PRId64 ", %" PRId64 "), to dnode:%d", pSender->term, - pSender->startTime, DID(&destId)); + sSInfo(pSender, "snapshot sender stop to dnode:%d. signature:(%" PRId64 ", %" PRId64 "), finish:%d", DID(&destId), + pSender->term, pSender->startTime, finish); } // when sender receive ack, call this function to send msg from seq @@ -522,8 +522,8 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p pReceiver->startTime = pPreMsg->startTime; ASSERT(pReceiver->startTime); - sRInfo(pReceiver, "snapshot receiver started. signature:(%" PRId64 ", %" PRId64 "), from dnode:%d", pReceiver->term, - pReceiver->startTime, DID(&pReceiver->fromId)); + sRInfo(pReceiver, "snapshot receiver start from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", + DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime); } // just set start = false @@ -547,8 +547,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { syncSnapBufferReset(pReceiver->pRcvBuf); - sRInfo(pReceiver, "snapshot receiver stopped. signature:(%" PRId64 ", %" PRId64 "), from dnode:%d", pReceiver->term, - pReceiver->startTime, DID(&pReceiver->fromId)); + sRInfo(pReceiver, "snapshot receiver stop from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", + DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime); } // when recv last snapshot block, apply data into snapshot @@ -1266,7 +1266,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader"); sSError(pSender, "snapshot sender not leader"); terrno = TSDB_CODE_SYN_NOT_LEADER; goto _ERROR; @@ -1274,15 +1273,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); if (pMsg->term != currentTerm) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); - sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, + sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, currentTerm); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _ERROR; } if (pMsg->code != 0) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code"); sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code); terrno = pMsg->code; goto _ERROR; @@ -1290,12 +1287,10 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // prepare , send begin msg if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); } if (pSender->pReader == NULL || pSender->finish) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid"); sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code, pSender->pReader, pSender->finish); terrno = pMsg->code; @@ -1303,7 +1298,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { } if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); + sSInfo(pSender, "process seq begin"); if (snapshotSend(pSender) != 0) { goto _ERROR; } @@ -1312,7 +1307,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // receive ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); + sSInfo(pSender, "process seq end"); snapshotSenderStop(pSender, true); syncNodeReplicateReset(pSyncNode, &pMsg->srcId); return 0; @@ -1320,12 +1315,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // send next msg if (syncSnapBufferSend(pSender, ppMsg) != 0) { + sSError(pSender, "failed to send snapshot msg since %s. seq:%d", terrstr(), pSender->seq); goto _ERROR; } return 0; _ERROR: - snapshotSenderStop(pSender, true); + snapshotSenderStop(pSender, false); syncNodeReplicateReset(pSyncNode, &pMsg->srcId); return -1; }