From 4d61e87c0f66fddd0ae3946b553772b8530fc044 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 24 Oct 2023 17:11:32 +0800 Subject: [PATCH] enh: add signature info in logging msgs of snap replication --- source/libs/sync/src/syncSnapshot.c | 56 +++++++++++++---------------- source/libs/sync/src/syncUtil.c | 51 +++++++++++++------------- 2 files changed, 49 insertions(+), 58 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 195354ac79..ad782bd51d 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -195,8 +195,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { goto _out; } - sSInfo(pSender, "snapshot sender start to dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", DID(&pMsg->destId), - pSender->term, pSender->startTime); + sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId)); code = 0; _out: @@ -233,8 +232,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { syncSnapBufferReset(pSender->pSndBuf); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - sSInfo(pSender, "snapshot sender stop to dnode:%d. signature:(%" PRId64 ", %" PRId64 "), finish:%d", DID(&destId), - pSender->term, pSender->startTime, finish); + sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); } // when sender receive ack, call this function to send msg from seq @@ -522,8 +520,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p pReceiver->startTime = pPreMsg->startTime; ASSERT(pReceiver->startTime); - sRInfo(pReceiver, "snapshot receiver start from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", - DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime); + sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId)); } // just set start = false @@ -547,8 +544,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { syncSnapBufferReset(pReceiver->pRcvBuf); - sRInfo(pReceiver, "snapshot receiver stop from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", - DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime); + sRInfo(pReceiver, "snapshot receiver stop, from dnode:%d.", DID(&pReceiver->fromId)); } // when recv last snapshot block, apply data into snapshot @@ -676,22 +672,22 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM int32_t order = 0; if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) { sRInfo(pReceiver, - "received a new snapshot preparation. restart receiver" - "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); + "received a new snapshot preparation. restart receiver." + " msg signature:(%" PRId64 ", %" PRId64 ")", + pMsg->term, pMsg->startTime); goto _START_RECEIVER; } else if (order == 0) { sRInfo(pReceiver, - "received a duplicate snapshot preparation. send reply" - "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); + "received a duplicate snapshot preparation. send reply." + " msg signature:(%" PRId64 ", %" PRId64 ")", + pMsg->term, pMsg->startTime); goto _SEND_REPLY; } else { // ignore sRError(pReceiver, - "received a stale snapshot preparation. ignore" - "receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); + "received a stale snapshot preparation. ignore." + " msg signature:(%" PRId64 ", %" PRId64 ")", + pMsg->term, pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; code = terrno; goto _SEND_REPLY; @@ -809,6 +805,8 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p goto _SEND_REPLY; } + sRInfo(pReceiver, "snapshot begin"); + code = 0; _SEND_REPLY: if (code != 0 && terrno != 0) { @@ -1009,8 +1007,7 @@ _SEND_REPLY:; // int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; - SyncSnapshotSend *pMsg = ppMsg[0]; - ASSERT(pMsg); + SyncSnapshotSend *pMsg = ppMsg[0]; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; // if already drop replica, do not process @@ -1040,16 +1037,16 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { if (pMsg->term == raftStoreGetTerm(pSyncNode)) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { - sInfo("vgId:%d, receive pre-snapshot msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, receive prepare msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - sInfo("vgId:%d, receive begin msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, receive begin msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - sInfo("vgId:%d, receive end msg of snapshot replication. signature: (%" PRId64 ", %" PRId64 ")", - pSyncNode->vgId, pMsg->term, pMsg->startTime); + sInfo("vgId:%d, receive end msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, + pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); @@ -1059,7 +1056,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // force close, no response syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); snapshotReceiverStop(pReceiver); - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq <= SYNC_SNAPSHOT_SEQ_END) { + } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); } else { @@ -1139,10 +1136,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend pSendMsg->startTime = pSender->startTime; pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; - ASSERT(pSendMsg->startTime); - - sSInfo(pSender, "begin snapshot replication to dnode %d. startTime:%" PRId64, DID(&pSendMsg->destId), - pSendMsg->startTime); + sSInfo(pSender, "begin snapshot replication to dnode %d." PRId64, DID(&pSendMsg->destId)); // send msg syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre"); @@ -1252,10 +1246,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // check signature int32_t order = 0; if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) { - sSError(pSender, - "received a stale snapshot rsp. ignore it" - "sender signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", - pSender->term, pSender->startTime, pMsg->term, pMsg->startTime); + sSError(pSender, "received a stale snapshot rsp, msg signature:(%" PRId64 ", %" PRId64 "), ignore it.", pMsg->term, + pMsg->startTime); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } else if (order < 0) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9acc17e130..dc16b0e958 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -266,22 +266,21 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer); va_end(argpointer); - taosPrintLog(flags, level, dflag, - "vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 - " last-term:%" PRIu64 " last-cfg:%" PRId64 - ", seq:%d ack:%d finish:%d, as:%d dnode:%d}" - ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 - ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64 - "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", - pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, - pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, - pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, - DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex, - logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, - pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, - pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + taosPrintLog( + flags, level, dflag, + "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 " end:%" PRId64 + " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + ", seq:%d ack:%d finish:%d, as:%d, to-dnode:%d}" + ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 + ", snap:{last-index:%" PRId64 ", term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 + ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", + pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime, + pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, + pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), + pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, + pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr); } void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, @@ -316,19 +315,19 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df taosPrintLog( flags, level, dflag, "vgId:%d, %s, sync:%s," - " snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64 - " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 + " from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 "}" ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", - pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, - pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, - pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, - raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, - snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, - pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, - syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", + pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start, + pReceiver->ack, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, + pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, + pReceiver->snapshot.lastConfigIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, + pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, + pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, + pNode->restoreFinish, pNode->quorum, peerStr, cfgStr); } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {