From ec5b1f2ec175b33c5aa8f6b40ce98e006787d2fa Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 25 Oct 2023 15:04:14 +0800 Subject: [PATCH] enh: logging buf info for snapshot reader and sender --- source/libs/sync/inc/syncSnapshot.h | 2 +- source/libs/sync/src/syncSnapshot.c | 16 ++-------- source/libs/sync/src/syncUtil.c | 49 ++++++++++++++++------------- 3 files changed, 30 insertions(+), 37 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 0332204769..93c62544f2 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -56,7 +56,7 @@ typedef struct SSyncSnapshotSender { int64_t lastSendTime; bool finish; - // buffer + // ring buffer for ack SSyncSnapBuffer *pSndBuf; // init when create diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8a83891735..751f777b18 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -345,20 +345,6 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { return 0; } -static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { - if (pMsg->ack != pSender->seq) { - sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - return -1; - } - - pSender->ack = pMsg->ack; - pSender->seq++; - - sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq); - return 0; -} - // return 0, start ok // return 1, last snapshot finish ok // return -1, error @@ -1182,6 +1168,8 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp pSndBuf->start = ack + 1; } + pSender->ack = pSndBuf->start - 1; + while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { if (snapshotSend(pSender) != 0) { code = terrno; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index dc16b0e958..9e6ea94e78 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -266,21 +266,24 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer); va_end(argpointer); - taosPrintLog( - flags, level, dflag, - "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 " end:%" PRId64 - " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 - ", seq:%d ack:%d finish:%d, as:%d, to-dnode:%d}" - ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 - ", snap:{last-index:%" PRId64 ", term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", - pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime, - pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, - pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), - pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, - snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, - pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr); + taosPrintLog(flags, level, dflag, + "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 + " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + ", seq:%d, ack:%d, " + " buf:[%" PRId64 " %" PRId64 ", %" PRId64 + "), finish:%d, as:%d, to-dnode:%d}" + ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 + ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64 + "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 + ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", + pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime, + pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, + pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, + pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish, + pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), + pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, + pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr); } void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, @@ -315,19 +318,21 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df taosPrintLog( flags, level, dflag, "vgId:%d, %s, sync:%s," - " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 + " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64 + ")" " from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 "}" ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s", pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start, - pReceiver->ack, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, - pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, - pReceiver->snapshot.lastConfigIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, - pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, - pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, - pNode->restoreFinish, pNode->quorum, peerStr, cfgStr); + pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end, + DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, + pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, + raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, + snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, + pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, + cfgStr); } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {