enh: log signature of snap sender/receiver while started or stopped

This commit is contained in:
Benguang Zhao 2023-10-24 10:15:39 +08:00
parent e34da43e38
commit f444cd7a6d
1 changed files with 14 additions and 7 deletions

View File

@ -175,7 +175,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode); pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
@ -189,15 +189,15 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
memcpy(pMsg->data, snapInfo.data, dataLen); memcpy(pMsg->data, snapInfo.data, dataLen);
} }
// event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
goto _out; goto _out;
} }
sSInfo(pSender, "snapshot sender started. signature:(%" PRId64 ", %" PRId64 "), to dnode:%d", pSender->term,
pSender->startTime, DID(&pMsg->destId));
code = 0; code = 0;
_out: _out:
if (snapInfo.data) { if (snapInfo.data) {
@ -231,6 +231,10 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
} }
syncSnapBufferReset(pSender->pSndBuf); syncSnapBufferReset(pSender->pSndBuf);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stopped. signature:(%" PRId64 ", %" PRId64 "), to dnode:%d", pSender->term,
pSender->startTime, DID(&destId));
} }
// when sender receive ack, call this function to send msg from seq // when sender receive ack, call this function to send msg from seq
@ -515,14 +519,14 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
ASSERT(pReceiver->startTime); ASSERT(pReceiver->startTime);
// event log sRInfo(pReceiver, "snapshot receiver started. signature:(%" PRId64 ", %" PRId64 "), from dnode:%d", pReceiver->term,
sRInfo(pReceiver, "snapshot receiver is start"); pReceiver->startTime, DID(&pReceiver->fromId));
} }
// just set start = false // just set start = false
// FpSnapshotStopWrite should not be called // FpSnapshotStopWrite should not be called
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return; if (stopped) return;
@ -539,6 +543,9 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
} }
syncSnapBufferReset(pReceiver->pRcvBuf); syncSnapBufferReset(pReceiver->pRcvBuf);
sRInfo(pReceiver, "snapshot receiver stopped. signature:(%" PRId64 ", %" PRId64 "), from dnode:%d", pReceiver->term,
pReceiver->startTime, DID(&pReceiver->fromId));
} }
// when recv last snapshot block, apply data into snapshot // when recv last snapshot block, apply data into snapshot