refactor(sync): rename functions
This commit is contained in:
parent
49c19e13f2
commit
1b36ad119c
|
@ -36,6 +36,7 @@ extern bool gRaftDetailLog;
|
||||||
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
|
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
|
||||||
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
|
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
|
||||||
#define SYNC_ADD_QUORUM_COUNT 3
|
#define SYNC_ADD_QUORUM_COUNT 3
|
||||||
|
#define SYNC_MNODE_MAX_LOG_NUM 10000
|
||||||
|
|
||||||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||||
|
|
||||||
|
@ -203,7 +204,6 @@ void syncStop(int64_t rid);
|
||||||
int32_t syncSetStandby(int64_t rid);
|
int32_t syncSetStandby(int64_t rid);
|
||||||
ESyncState syncGetMyRole(int64_t rid);
|
ESyncState syncGetMyRole(int64_t rid);
|
||||||
bool syncIsReady(int64_t rid);
|
bool syncIsReady(int64_t rid);
|
||||||
bool syncIsReadyForRead(int64_t rid);
|
|
||||||
const char* syncGetMyRoleStr(int64_t rid);
|
const char* syncGetMyRoleStr(int64_t rid);
|
||||||
bool syncRestoreFinish(int64_t rid);
|
bool syncRestoreFinish(int64_t rid);
|
||||||
SyncTerm syncGetMyTerm(int64_t rid);
|
SyncTerm syncGetMyTerm(int64_t rid);
|
||||||
|
|
|
@ -281,7 +281,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
|
||||||
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
|
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
|
||||||
|
|
||||||
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
|
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
|
||||||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
|
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
|
||||||
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
|
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
|
||||||
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
|
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
|
||||||
|
|
||||||
|
|
|
@ -299,7 +299,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
||||||
int32_t code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
|
int32_t code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
||||||
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
|
code = syncNodeDoCommit(ths, beginIndex, endIndex, ths->state);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,8 +45,10 @@
|
||||||
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||||
//
|
//
|
||||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
|
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// advance commit index to sanpshot first
|
// advance commit index to sanpshot first
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
|
@ -129,7 +131,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
// execute fsm
|
// execute fsm
|
||||||
if (pSyncNode->pFsm != NULL) {
|
if (pSyncNode->pFsm != NULL) {
|
||||||
int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
|
int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,6 +297,20 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
ASSERT(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (syncNodeIsMnode(pSyncNode)) {
|
||||||
|
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
||||||
|
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
||||||
|
int64_t logNum = endIndex - beginIndex;
|
||||||
|
if (logNum > SYNC_MNODE_MAX_LOG_NUM) {
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "mnode log num:%ld, do not delete", logNum);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
||||||
if (lastApplyIndex > matchIndex) {
|
if (lastApplyIndex > matchIndex) {
|
||||||
|
@ -311,6 +325,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3117,7 +3132,7 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
|
||||||
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
||||||
if (beginIndex > endIndex) {
|
if (beginIndex > endIndex) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue