fix: remove syncNodeReplicateOne from syncNodeOnSnapshotReply
This commit is contained in:
parent
2e640e8e68
commit
22d64b9c0b
|
@ -176,6 +176,9 @@ void vnodeBufPoolRef(SVBufPool *pPool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
void vnodeBufPoolUnRef(SVBufPool *pPool) {
|
||||||
|
if (pPool == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
|
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
SVnode *pVnode = pPool->pVnode;
|
SVnode *pVnode = pPool->pVnode;
|
||||||
|
|
|
@ -497,8 +497,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
|
|
||||||
// execute it
|
// execute it
|
||||||
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
|
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index,
|
sInfo("vgId:%d, raft mgmt msg in log entry. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index,
|
||||||
pEntry->term);
|
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||||
pBuf->commitIndex = index;
|
pBuf->commitIndex = index;
|
||||||
if (!inBuf) {
|
if (!inBuf) {
|
||||||
syncEntryDestroy(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
|
@ -539,7 +539,7 @@ _out:
|
||||||
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
|
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
|
||||||
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
|
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
|
||||||
pNode->restoreFinish = true;
|
pNode->restoreFinish = true;
|
||||||
sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
|
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -625,8 +625,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
|
||||||
ASSERT(pMgr->matchIndex == 0);
|
ASSERT(pMgr->matchIndex == 0);
|
||||||
if (pMsg->matchIndex < 0) {
|
if (pMsg->matchIndex < 0) {
|
||||||
pMgr->restored = true;
|
pMgr->restored = true;
|
||||||
sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
|
sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64
|
||||||
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -641,8 +641,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
|
||||||
|
|
||||||
if (pMsg->matchIndex == pMsg->lastSendIndex) {
|
if (pMsg->matchIndex == pMsg->lastSendIndex) {
|
||||||
pMgr->restored = true;
|
pMgr->restored = true;
|
||||||
sInfo("vgId:%d, sync log repl mgr of peer %s:%d (%" PRIx64 ") restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
|
sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), repl mgr(rs:%d): [%" PRId64 " %" PRId64
|
||||||
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
", %" PRId64 "), log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -663,7 +663,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
|
||||||
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
|
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
|
||||||
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
|
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
sInfo("vgId:%d, snapshot replication to peer %s:%d started", pNode->vgId, host, port);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -702,8 +704,8 @@ int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pN
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
|
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
|
||||||
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "",
|
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
|
||||||
pNode->vgId, pMsg->startTime, pMgr->peerStartTime);
|
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
|
||||||
syncLogReplMgrReset(pMgr);
|
syncLogReplMgrReset(pMgr);
|
||||||
pMgr->peerStartTime = pMsg->startTime;
|
pMgr->peerStartTime = pMsg->startTime;
|
||||||
}
|
}
|
||||||
|
@ -715,9 +717,8 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
if (pMsg->startTime != pMgr->peerStartTime) {
|
if (pMsg->startTime != pMgr->peerStartTime) {
|
||||||
sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64
|
sInfo("vgId:%d, reset sync log repl mgr in append entries reply. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
|
||||||
"",
|
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
|
||||||
pNode->vgId, pMsg->startTime, pMgr->peerStartTime);
|
|
||||||
syncLogReplMgrReset(pMgr);
|
syncLogReplMgrReset(pMgr);
|
||||||
pMgr->peerStartTime = pMsg->startTime;
|
pMgr->peerStartTime = pMsg->startTime;
|
||||||
}
|
}
|
||||||
|
@ -923,7 +924,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
|
||||||
(void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1);
|
(void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1);
|
||||||
|
|
||||||
sInfo("vgId:%d, reset log buffer. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
|
sInfo("vgId:%d, reset log buffer. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
|
||||||
pBuf->endIndex = pBuf->matchIndex + 1;
|
pBuf->endIndex = pBuf->matchIndex + 1;
|
||||||
|
|
|
@ -49,6 +49,7 @@
|
||||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||||
|
|
||||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) {
|
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) {
|
||||||
|
ASSERT(false && "deplicated");
|
||||||
// next index
|
// next index
|
||||||
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
||||||
|
|
||||||
|
|
|
@ -881,8 +881,8 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||||
snapshotSenderStop(pSender, true);
|
snapshotSenderStop(pSender, true);
|
||||||
|
|
||||||
// update next-index
|
// update next-index
|
||||||
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), pMsg->lastIndex + 1);
|
// syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), pMsg->lastIndex + 1);
|
||||||
syncNodeReplicateOne(pSyncNode, &(pMsg->srcId), false);
|
// syncNodeReplicateOne(pSyncNode, &(pMsg->srcId), false);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue