From 22d64b9c0befba2a0c36b6156a293a9d0d393cc3 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 24 Nov 2022 15:55:31 +0800 Subject: [PATCH] fix: remove syncNodeReplicateOne from syncNodeOnSnapshotReply --- source/dnode/vnode/src/vnd/vnodeBufPool.c | 3 +++ source/libs/sync/src/syncPipeline.c | 27 ++++++++++++----------- source/libs/sync/src/syncReplication.c | 1 + source/libs/sync/src/syncSnapshot.c | 4 ++-- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 71e926bd35..dcc323f778 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -176,6 +176,9 @@ void vnodeBufPoolRef(SVBufPool *pPool) { } void vnodeBufPoolUnRef(SVBufPool *pPool) { + if (pPool == NULL) { + return; + } int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); if (nRef == 0) { SVnode *pVnode = pPool->pVnode; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a7a983a06e..891547e80f 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -497,8 +497,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm // execute it if (!syncUtilUserCommit(pEntry->originalRpcType)) { - sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, - pEntry->term); + sInfo("vgId:%d, raft mgmt msg in log entry. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index, + pEntry->term, TMSG_INFO(pEntry->originalRpcType)); pBuf->commitIndex = index; if (!inBuf) { syncEntryDestroy(pEntry); @@ -539,7 +539,7 @@ _out: if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->restoreFinish = true; - sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } @@ -625,8 +625,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64 + ", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; @@ -641,8 +641,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod if (pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64 + ", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; @@ -663,7 +663,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); + return -1; } + sInfo("vgId:%d, snapshot replication to peer %s:%d started", pNode->vgId, host, port); return 0; } @@ -702,8 +704,8 @@ int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pN SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "", - pNode->vgId, pMsg->startTime, pMgr->peerStartTime); + sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", + pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplMgrReset(pMgr); pMgr->peerStartTime = pMsg->startTime; } @@ -715,9 +717,8 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64 - "", - pNode->vgId, pMsg->startTime, pMgr->peerStartTime); + sInfo("vgId:%d, reset sync log repl mgr in append entries reply. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, + pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplMgrReset(pMgr); pMgr->peerStartTime = pMsg->startTime; } @@ -923,7 +924,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); - sInfo("vgId:%d, reset log buffer. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + sInfo("vgId:%d, reset log buffer. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->endIndex = pBuf->matchIndex + 1; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 2623eebc23..cfa6b5215e 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -49,6 +49,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) { + ASSERT(false && "deplicated"); // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 1035925c2b..4ceb2c9fc9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -881,8 +881,8 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { snapshotSenderStop(pSender, true); // update next-index - syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), pMsg->lastIndex + 1); - syncNodeReplicateOne(pSyncNode, &(pMsg->srcId), false); + // syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), pMsg->lastIndex + 1); + // syncNodeReplicateOne(pSyncNode, &(pMsg->srcId), false); return 0; }