Merge pull request #20954 from taosdata/FIX/TD-22419-main
enh: remove unused functions in sync
This commit is contained in:
commit
cda3222a18
|
@ -48,8 +48,6 @@ extern "C" {
|
|||
void syncOneReplicaAdvance(SSyncNode* pSyncNode);
|
||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
|
||||
|
||||
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
|
||||
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
|
||||
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
|
||||
|
||||
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex);
|
||||
|
|
|
@ -55,7 +55,6 @@ int32_t syncNodeReplicateReset(SSyncNode* pSyncNode, SRaftId* pDestId);
|
|||
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
|
||||
|
||||
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -89,17 +89,6 @@
|
|||
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
||||
//
|
||||
|
||||
SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
|
||||
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
|
||||
if (pEntry == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
(void)memcpy(pEntry, pMsg->data, pMsg->dataLen);
|
||||
ASSERT(pEntry->bytes == pMsg->dataLen);
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||
SRpcMsg rpcRsp = {0};
|
||||
|
@ -146,7 +135,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
goto _IGNORE;
|
||||
}
|
||||
|
||||
pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
||||
pEntry = syncEntryBuildFromAppendEntries(pMsg);
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
||||
goto _IGNORE;
|
||||
|
|
|
@ -44,22 +44,6 @@
|
|||
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||
//
|
||||
|
||||
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
|
||||
// I am leader, I agree
|
||||
if (syncUtilSameId(pRaftId, &(pSyncNode->myRaftId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// follower agree
|
||||
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, pRaftId);
|
||||
if (matchIndex >= index) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// not agree
|
||||
return false;
|
||||
}
|
||||
|
||||
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
||||
ASSERT(a >= 0);
|
||||
ASSERT(b >= 0);
|
||||
|
@ -85,19 +69,6 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
|
|||
return count >= pNode->quorum;
|
||||
}
|
||||
|
||||
bool syncAgree(SSyncNode* pNode, SyncIndex index) {
|
||||
int agreeCount = 0;
|
||||
for (int i = 0; i < pNode->replicaNum; ++i) {
|
||||
if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) {
|
||||
++agreeCount;
|
||||
}
|
||||
if (agreeCount >= pNode->quorum) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
|
||||
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||
commitIndex = TMAX(commitIndex, ths->commitIndex);
|
||||
|
|
|
@ -64,10 +64,13 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn
|
|||
}
|
||||
|
||||
SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
|
||||
if (pEntry == NULL) return NULL;
|
||||
|
||||
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
|
||||
if (pEntry == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
memcpy(pEntry, pMsg->data, pMsg->dataLen);
|
||||
ASSERT(pEntry->bytes == pMsg->dataLen);
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,8 +46,6 @@
|
|||
// mdest |-> j])
|
||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
|
||||
|
||||
int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
|
||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
|
@ -86,20 +84,6 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
|
||||
int32_t ret = 0;
|
||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||
|
||||
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
|
||||
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg);
|
||||
} else {
|
||||
sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1);
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue