Merge pull request #20880 from taosdata/FIX/TD-23509-main
enh: refactor some sync func names
This commit is contained in:
commit
9c0027cb2a
|
@ -163,8 +163,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
dGWarn("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
|
||||||
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
|
||||||
terrno = (terrno != 0) ? terrno : -1;
|
terrno = (terrno != 0) ? terrno : -1;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,18 +77,19 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr
|
||||||
|
|
||||||
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||||
|
|
||||||
int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||||
int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||||
SRaftId* pDestId, bool* pBarrier);
|
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||||
int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
|
||||||
int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||||
|
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
|
||||||
|
bool* pBarrier);
|
||||||
|
|
||||||
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||||
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||||
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
|
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
|
||||||
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
|
||||||
|
|
||||||
// SSyncLogBuffer
|
// SSyncLogBuffer
|
||||||
SSyncLogBuffer* syncLogBufferCreate();
|
SSyncLogBuffer* syncLogBufferCreate();
|
||||||
|
@ -100,6 +101,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
|
||||||
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
||||||
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
|
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
|
||||||
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
|
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
|
||||||
|
|
||||||
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
||||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
||||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
||||||
|
|
|
@ -45,7 +45,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
|
||||||
void syncEntryDestroy(SSyncRaftEntry* pEntry);
|
void syncEntryDestroy(SSyncRaftEntry* pEntry);
|
||||||
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
|
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
|
||||||
|
|
||||||
static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
|
static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) {
|
||||||
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
|
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -599,7 +599,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
|
||||||
sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
|
sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType));
|
||||||
code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
|
code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
|
sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
|
||||||
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -633,7 +633,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
|
if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
|
||||||
syncLogReplReset(pMgr);
|
syncLogReplReset(pMgr);
|
||||||
sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
|
sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
|
||||||
pDestId->addr);
|
pDestId->addr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -658,15 +658,15 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
if (pMgr->states[pos].acked) {
|
if (pMgr->states[pos].acked) {
|
||||||
if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
|
if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
|
||||||
syncLogReplReset(pMgr);
|
syncLogReplReset(pMgr);
|
||||||
sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId,
|
sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index,
|
||||||
index, pDestId->addr);
|
pDestId->addr);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
goto _out;
|
goto _out;
|
||||||
|
@ -708,7 +708,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod
|
||||||
ASSERT(pMgr->matchIndex == 0);
|
ASSERT(pMgr->matchIndex == 0);
|
||||||
if (pMsg->matchIndex < 0) {
|
if (pMsg->matchIndex < 0) {
|
||||||
pMgr->restored = true;
|
pMgr->restored = true;
|
||||||
sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
|
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
|
||||||
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
@ -725,7 +725,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod
|
||||||
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
|
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
|
||||||
pMgr->matchIndex = pMsg->matchIndex;
|
pMgr->matchIndex = pMsg->matchIndex;
|
||||||
pMgr->restored = true;
|
pMgr->restored = true;
|
||||||
sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
|
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
|
||||||
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
@ -774,14 +774,14 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod
|
||||||
|
|
||||||
// attempt to replicate the raft log at index
|
// attempt to replicate the raft log at index
|
||||||
(void)syncLogReplReset(pMgr);
|
(void)syncLogReplReset(pMgr);
|
||||||
return syncLogReplReplicateProbe(pMgr, pNode, index);
|
return syncLogReplProbe(pMgr, pNode, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
|
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
|
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
|
||||||
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
|
sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
|
||||||
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
|
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
|
||||||
syncLogReplReset(pMgr);
|
syncLogReplReset(pMgr);
|
||||||
pMgr->peerStartTime = pMsg->startTime;
|
pMgr->peerStartTime = pMsg->startTime;
|
||||||
|
@ -794,8 +794,7 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
if (pMsg->startTime != pMgr->peerStartTime) {
|
if (pMsg->startTime != pMgr->peerStartTime) {
|
||||||
sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64
|
sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
|
||||||
", old:%" PRId64,
|
|
||||||
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
|
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
|
||||||
syncLogReplReset(pMgr);
|
syncLogReplReset(pMgr);
|
||||||
pMgr->peerStartTime = pMsg->startTime;
|
pMgr->peerStartTime = pMsg->startTime;
|
||||||
|
@ -810,16 +809,16 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
if (pMgr->restored) {
|
if (pMgr->restored) {
|
||||||
(void)syncLogReplReplicateAttempt(pMgr, pNode);
|
(void)syncLogReplAttempt(pMgr, pNode);
|
||||||
} else {
|
} else {
|
||||||
(void)syncLogReplReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
|
(void)syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
||||||
ASSERT(!pMgr->restored);
|
ASSERT(!pMgr->restored);
|
||||||
ASSERT(pMgr->startIndex >= 0);
|
ASSERT(pMgr->startIndex >= 0);
|
||||||
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
|
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
|
||||||
|
@ -834,7 +833,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
SyncTerm term = -1;
|
SyncTerm term = -1;
|
||||||
if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -857,7 +856,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
ASSERT(pMgr->restored);
|
ASSERT(pMgr->restored);
|
||||||
|
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
|
@ -879,7 +878,7 @@ int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
SyncTerm term = -1;
|
SyncTerm term = -1;
|
||||||
if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -932,7 +931,7 @@ int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
|
||||||
pMgr->startIndex = pMgr->matchIndex;
|
pMgr->startIndex = pMgr->matchIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
return syncLogReplReplicateAttempt(pMgr, pNode);
|
return syncLogReplAttempt(pMgr, pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncLogReplMgr* syncLogReplCreate() {
|
SSyncLogReplMgr* syncLogReplCreate() {
|
||||||
|
@ -1127,8 +1126,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
|
||||||
return pEntry;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
|
||||||
SRaftId* pDestId, bool* pBarrier) {
|
bool* pBarrier) {
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
SRpcMsg msgOut = {0};
|
SRpcMsg msgOut = {0};
|
||||||
bool inBuf = false;
|
bool inBuf = false;
|
||||||
|
@ -1141,14 +1140,14 @@ int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
|
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
|
||||||
if (pMgr) {
|
if (pMgr) {
|
||||||
sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId,
|
sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr,
|
||||||
pDestId->addr, terrstr(), index);
|
terrstr(), index);
|
||||||
(void)syncLogReplReset(pMgr);
|
(void)syncLogReplReset(pMgr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
*pBarrier = syncLogIsReplicationBarrier(pEntry);
|
*pBarrier = syncLogReplBarrier(pEntry);
|
||||||
|
|
||||||
prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index);
|
prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index);
|
||||||
if (prevLogTerm < 0) {
|
if (prevLogTerm < 0) {
|
||||||
|
|
|
@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
||||||
(void)syncLogReplReplicateOnce(pMgr, pNode);
|
(void)syncLogReplDoOnce(pMgr, pNode);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -320,7 +320,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_ALREADY_DEPLOYED, "Snode already deploye
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_DEPLOYED, "Snode not deployed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_DEPLOYED, "Snode not deployed")
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode moved to another dnode or was deleted")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")
|
||||||
|
|
Loading…
Reference in New Issue